Skip to main content

oxideav_rtmp/
adapter.rs

1//! `PacketSource` adapter wrapping an [`RtmpSession`].
2//!
3//! Bridges this crate's protocol-native [`StreamPacket`] (FLV-style
4//! audio + video tags, plus `onMetaData` AMF0 objects) into the
5//! workspace's [`oxideav_core::Packet`] shape so a
6//! [`oxideav_core::registry::SourceRegistry`] can dispatch
7//! `rtmp://` URIs into the standard pipeline executor.
8//!
9//! The adapter is **listen-style** by URL convention:
10//!
11//! ```text
12//! rtmp://0.0.0.0:1935/live/secret-key
13//!         ^^^^^^^^^^^^^ local TCP bind
14//!                       ^^^^ ^^^^^^^^^^ app + stream-name the
15//!                                       publisher must announce
16//! ```
17//!
18//! The opener binds on the URL's `host:port`, accepts **one**
19//! publisher, verifies that the announced `app` + `stream_name`
20//! match the URL path (rejecting otherwise), then returns a
21//! [`RtmpPacketSource`] the executor pumps via `next_packet()`.
22//! The historical [`RtmpServer`](crate::RtmpServer) /
23//! [`RtmpClient`](crate::RtmpClient) API is unchanged — this
24//! adapter is purely additive.
25//!
26//! # Stream layout
27//!
28//! Always exactly two streams, both opened with TimeBase
29//! 1/1_000_000_000 (nanoseconds). RTMP chunks carry a millisecond
30//! `timestamp` and Enhanced-RTMP-v2 ModEx `TimestampOffsetNano`
31//! entries add sub-millisecond precision; the adapter folds both
32//! into the same nanosecond-resolution timeline so a downstream
33//! consumer reads a single uniform clock:
34//!
35//! * **Stream 0 — audio.** Codec id derived lazily from the first
36//!   audio tag the publisher sends (`aac` for AAC, `mp3` for MP3,
37//!   etc; see [`audio_codec_id`]). If the publisher never sends
38//!   audio the stream stays present but emits no packets.
39//! * **Stream 1 — video.** Codec id from the first video tag
40//!   (`h264` for AVC, `h263`, `vp6`, `vp6a`, screen-codec ids;
41//!   see [`video_codec_id`]).
42//!
43//! The opener buffers up to [`PROBE_LIMIT`] packets after the
44//! handshake completes so it can observe at least one of each
45//! kind before returning. Buffered packets are replayed in order
46//! by `next_packet()` before any new reads. If the publisher
47//! disconnects during probing, whichever streams were observed
48//! are reported.
49//!
50//! # Timestamps
51//!
52//! RTMP carries a single 32-bit `timestamp` per chunk, expressed
53//! in milliseconds. Enhanced RTMP v2 lets a sender prepend a
54//! `TimestampOffsetNano` ModEx entry (`enhanced-rtmp-v2.pdf`
55//! §"ExVideoTagHeader" / §"ExAudioTagHeader") carrying a 0..=999_999
56//! ns offset to be added to the *presentation* time of the current
57//! media message **without altering the core RTMP timestamp**. The
58//! adapter folds these per-message offsets into the nanosecond
59//! [`Packet`] timeline:
60//!
61//! * Audio — `pts = dts = timestamp_ms * 1_000_000 + nano_offset`.
62//! * Video AVC / NALU-FourCC — `dts = timestamp_ms * 1_000_000`
63//!   (decode time, unmodified per spec); `pts = (timestamp_ms +
64//!   composition_time_ms) * 1_000_000 + nano_offset`. The
65//!   composition-time offset stays in milliseconds because that is
66//!   the wire field; the nanosecond modifier rides on top.
67//! * Non-NALU video without CTS — `pts == dts == timestamp_ms *
68//!   1_000_000 + nano_offset`.
69//!
70//! Multiple `TimestampOffsetNano` entries on the same tag are
71//! summed via [`VideoTag::timestamp_offset_nano`] /
72//! [`AudioTag::timestamp_offset_nano`] before folding, matching
73//! the per-tag accessor on those types.
74//!
75//! # Metadata variants
76//!
77//! `StreamPacket::Metadata(_)` carries an AMF0 `onMetaData`
78//! object — useful to publishers but not a media packet. We
79//! swallow it after recording its contents into
80//! [`PacketSource::metadata`] (string-flattening any scalar
81//! key/value pairs) and continue reading.
82
83use std::collections::VecDeque;
84use std::net::ToSocketAddrs;
85use std::time::Duration;
86
87use oxideav_core::{
88    BytesSource, CodecId, CodecParameters, Error as CoreError, Packet, PacketSource,
89    Result as CoreResult, SourceRegistry, StreamInfo, TimeBase,
90};
91
92use crate::amf::Amf0Value;
93use crate::error::{Error as RtmpError, Result as RtmpResult};
94use crate::flv::{
95    self, AudioTag, VideoTag, AAC_PACKET_TYPE_SEQUENCE_HEADER, AUDIO_FORMAT_AAC,
96    AUDIO_FORMAT_ADPCM, AUDIO_FORMAT_G711_ALAW, AUDIO_FORMAT_G711_MULAW, AUDIO_FORMAT_MP3,
97    AUDIO_FORMAT_NELLYMOSER, AUDIO_FORMAT_NELLYMOSER_16K_MONO, AUDIO_FORMAT_NELLYMOSER_8K_MONO,
98    AUDIO_FORMAT_PCM_LE, AUDIO_FORMAT_PCM_LE_8BIT, AUDIO_FORMAT_SPEEX, VIDEO_CODEC_AVC,
99    VIDEO_CODEC_H263, VIDEO_CODEC_SCREEN, VIDEO_CODEC_SCREEN_V2, VIDEO_CODEC_VP6, VIDEO_CODEC_VP6A,
100};
101use crate::server::{RtmpServer, RtmpSession, StreamPacket};
102
103/// Stream index for the audio output of an [`RtmpPacketSource`].
104pub const AUDIO_STREAM_INDEX: u32 = 0;
105/// Stream index for the video output of an [`RtmpPacketSource`].
106pub const VIDEO_STREAM_INDEX: u32 = 1;
107/// Time base used for both streams: 1/1_000_000_000 (nanoseconds).
108///
109/// RTMP chunks carry a 32-bit millisecond `timestamp` while
110/// Enhanced RTMP v2 ModEx `TimestampOffsetNano` entries add
111/// 0..=999_999 ns of sub-millisecond precision. A nanosecond
112/// timeline lets [`audio_to_packet`] / [`video_to_packet`] fold
113/// both into a single uniform `Packet::pts` / `Packet::dts`
114/// without losing precision (per `enhanced-rtmp-v2.pdf`
115/// §"ExVideoTagHeader" / §"ExAudioTagHeader").
116pub const RTMP_TIME_BASE: TimeBase = TimeBase::new(1, 1_000_000_000);
117
118/// Multiplier converting an RTMP millisecond timestamp into the
119/// nanosecond [`RTMP_TIME_BASE`] timeline.
120pub const RTMP_MS_TO_NS: i64 = 1_000_000;
121
122/// Maximum number of packets to buffer during stream-codec probing
123/// before giving up and returning whatever we have.
124pub const PROBE_LIMIT: usize = 32;
125
126/// Default read timeout applied to the underlying TCP socket
127/// during probing. Without this an absent publisher would block
128/// the opener forever waiting for the second stream type.
129pub const PROBE_READ_TIMEOUT: Duration = Duration::from_secs(10);
130
131/// Pre-buffered packet emitted by the probing phase, paired with
132/// its target stream index so we can replay in arrival order.
133struct BufferedPacket {
134    packet: Packet,
135    /// True for audio (stream 0), false for video (stream 1).
136    /// Kept as a flag to keep [`Packet`] free of out-of-band info.
137    #[allow(dead_code)]
138    is_audio: bool,
139}
140
141/// `PacketSource` wrapping an [`RtmpSession`].
142///
143/// Constructed by [`open_rtmp`] (the registry opener) or directly
144/// from a session via [`RtmpPacketSource::from_session`] — the
145/// latter is useful for callers driving their own
146/// [`RtmpServer::accept`] loop who want the typed-packet
147/// conversion without the listen-and-validate flow.
148pub struct RtmpPacketSource {
149    session: RtmpSession,
150    streams: Vec<StreamInfo>,
151    metadata: Vec<(String, String)>,
152    buffered: VecDeque<BufferedPacket>,
153    /// True when the underlying session has reported clean EOS
154    /// (peer sent `closeStream` / `deleteStream` / `FCUnpublish`,
155    /// or the TCP socket closed). After this `next_packet` keeps
156    /// returning [`CoreError::Eof`].
157    ended: bool,
158}
159
160impl RtmpPacketSource {
161    /// Wrap a freshly-accepted [`RtmpSession`] without any
162    /// probing — `streams()` will be empty until the first audio
163    /// / video packet flows. Suitable for callers who already
164    /// know the stream shape (e.g. they are also the publisher).
165    pub fn from_session(session: RtmpSession) -> Self {
166        Self {
167            session,
168            streams: Vec::new(),
169            metadata: Vec::new(),
170            buffered: VecDeque::new(),
171            ended: false,
172        }
173    }
174
175    /// Wrap a session and run the probing loop now: read up to
176    /// [`PROBE_LIMIT`] packets, populate `streams()` with the
177    /// observed audio + video codec ids, and buffer those packets
178    /// for later [`next_packet`](Self::next_packet) calls.
179    ///
180    /// `read_timeout` bounds individual reads so a publisher that
181    /// only ever sends one stream-type doesn't stall probing
182    /// indefinitely. `None` keeps the socket blocking with no
183    /// timeout — only safe when the caller knows the publisher
184    /// will eventually send both kinds.
185    pub fn from_session_with_probe(
186        mut session: RtmpSession,
187        read_timeout: Option<Duration>,
188    ) -> RtmpResult<Self> {
189        if let Some(d) = read_timeout {
190            // Best-effort — failure here is informational, not fatal.
191            let _ = session.set_read_timeout(Some(d));
192        }
193        let mut streams: Vec<StreamInfo> = Vec::new();
194        let mut metadata: Vec<(String, String)> = Vec::new();
195        let mut buffered: VecDeque<BufferedPacket> = VecDeque::new();
196        let mut have_audio = false;
197        let mut have_video = false;
198        let mut ended = false;
199
200        for _ in 0..PROBE_LIMIT {
201            if have_audio && have_video {
202                break;
203            }
204            let next = match session.next_packet() {
205                Ok(Some(p)) => p,
206                Ok(None) => {
207                    ended = true;
208                    break;
209                }
210                Err(RtmpError::Io(e))
211                    if matches!(
212                        e.kind(),
213                        std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
214                    ) =>
215                {
216                    // No more packets within the deadline — accept
217                    // whatever we have and bail out of probing.
218                    break;
219                }
220                Err(e) => return Err(e),
221            };
222            match next {
223                StreamPacket::Audio { timestamp, tag } => {
224                    if !have_audio {
225                        let params = audio_codec_params(&tag);
226                        streams.push(StreamInfo {
227                            index: AUDIO_STREAM_INDEX,
228                            time_base: RTMP_TIME_BASE,
229                            duration: None,
230                            start_time: None,
231                            params,
232                        });
233                        have_audio = true;
234                    }
235                    let pkt = audio_to_packet(timestamp, &tag);
236                    buffered.push_back(BufferedPacket {
237                        packet: pkt,
238                        is_audio: true,
239                    });
240                }
241                StreamPacket::Video { timestamp, tag } => {
242                    if !have_video {
243                        let params = video_codec_params(&tag);
244                        streams.push(StreamInfo {
245                            index: VIDEO_STREAM_INDEX,
246                            time_base: RTMP_TIME_BASE,
247                            duration: None,
248                            start_time: None,
249                            params,
250                        });
251                        have_video = true;
252                    }
253                    let pkt = video_to_packet(timestamp, &tag);
254                    buffered.push_back(BufferedPacket {
255                        packet: pkt,
256                        is_audio: false,
257                    });
258                }
259                StreamPacket::Metadata(value) => {
260                    flatten_metadata(&value, &mut metadata);
261                }
262            }
263        }
264
265        // Restore blocking mode for the steady-state phase: we
266        // want long-lived publishers to block on read, not poll.
267        let _ = session.set_read_timeout(None);
268
269        // Stable order: audio (index 0) before video (index 1).
270        streams.sort_by_key(|s| s.index);
271
272        Ok(Self {
273            session,
274            streams,
275            metadata,
276            buffered,
277            ended,
278        })
279    }
280
281    /// Borrow the wrapped session for advanced operations
282    /// (`set_read_timeout`, `peer_addr`, …). Reading directly
283    /// would interfere with the `PacketSource` machinery; prefer
284    /// inspection-only methods.
285    pub fn session(&self) -> &RtmpSession {
286        &self.session
287    }
288}
289
290impl PacketSource for RtmpPacketSource {
291    fn streams(&self) -> &[StreamInfo] {
292        &self.streams
293    }
294
295    fn next_packet(&mut self) -> CoreResult<Packet> {
296        if let Some(buf) = self.buffered.pop_front() {
297            return Ok(buf.packet);
298        }
299        if self.ended {
300            return Err(CoreError::Eof);
301        }
302        loop {
303            let event = self.session.next_packet().map_err(rtmp_to_core_err)?;
304            match event {
305                Some(StreamPacket::Audio { timestamp, tag }) => {
306                    if self.streams.iter().all(|s| s.index != AUDIO_STREAM_INDEX) {
307                        let params = audio_codec_params(&tag);
308                        self.streams.push(StreamInfo {
309                            index: AUDIO_STREAM_INDEX,
310                            time_base: RTMP_TIME_BASE,
311                            duration: None,
312                            start_time: None,
313                            params,
314                        });
315                        self.streams.sort_by_key(|s| s.index);
316                    }
317                    return Ok(audio_to_packet(timestamp, &tag));
318                }
319                Some(StreamPacket::Video { timestamp, tag }) => {
320                    if self.streams.iter().all(|s| s.index != VIDEO_STREAM_INDEX) {
321                        let params = video_codec_params(&tag);
322                        self.streams.push(StreamInfo {
323                            index: VIDEO_STREAM_INDEX,
324                            time_base: RTMP_TIME_BASE,
325                            duration: None,
326                            start_time: None,
327                            params,
328                        });
329                        self.streams.sort_by_key(|s| s.index);
330                    }
331                    return Ok(video_to_packet(timestamp, &tag));
332                }
333                Some(StreamPacket::Metadata(value)) => {
334                    flatten_metadata(&value, &mut self.metadata);
335                    // Loop again — metadata isn't a media packet.
336                    continue;
337                }
338                None => {
339                    self.ended = true;
340                    return Err(CoreError::Eof);
341                }
342            }
343        }
344    }
345
346    fn metadata(&self) -> &[(String, String)] {
347        &self.metadata
348    }
349
350    fn duration_micros(&self) -> Option<i64> {
351        // Live RTMP push has no a-priori duration.
352        None
353    }
354}
355
356// ────────────────────────── conversion helpers ──────────────────────────
357
358/// Map an FLV audio tag into a [`Packet`] on stream 0.
359///
360/// The FLV tag header byte (sound-format / rate / size / stereo)
361/// is stripped — downstream decoders consume the codec body
362/// directly. For legacy AAC the 1-byte AAC packet-type marker is
363/// retained ahead of the body so the decoder can distinguish
364/// `AudioSpecificConfig` from raw frames; non-AAC legacy payloads
365/// emit just `tag.body`.
366///
367/// For Enhanced RTMP v2 tags (`tag.audio_fourcc.is_some()`) the
368/// full ExHeader + FourCC framing is stripped and the body is
369/// passed through unmodified — the downstream codec consumes
370/// the raw codec bytes (`OpusCodedData` / `Ac3CodedData` /
371/// `Mp3CodedData` / `FlacCodedData` / `AacCodedData` / their
372/// per-codec sequence-header bodies). `PacketTypeSequenceStart`
373/// tags carry `flags.header = true`; `PacketTypeSequenceEnd`
374/// surfaces as an empty `data` with the `header` flag set so a
375/// consumer can route it to an end-of-stream signal.
376///
377/// `pts` and `dts` are emitted on the nanosecond [`RTMP_TIME_BASE`]
378/// timeline: `timestamp_ms * 1_000_000`, plus any
379/// `TimestampOffsetNano` ModEx contributions reported by
380/// [`AudioTag::timestamp_offset_nano`] folded onto the presentation
381/// time (audio has no separate decode time, so both `pts` and `dts`
382/// receive the offset).
383pub fn audio_to_packet(timestamp_ms: u32, tag: &AudioTag) -> Packet {
384    let ts_ns = (timestamp_ms as i64) * RTMP_MS_TO_NS;
385    let nano_offset = tag.timestamp_offset_nano() as i64;
386    let presentation_ns = ts_ns + nano_offset;
387    let (data, is_header) = if tag.audio_fourcc.is_some() {
388        // Enhanced RTMP v2: body is the codec's data verbatim
389        // (per `ExAudioTagBody`). No AAC marker — that's a legacy
390        // discriminator. SequenceStart is the header signal; the
391        // SequenceEnd variant gets `header = true` too so the
392        // downstream can recognise a flush boundary.
393        let header = matches!(
394            tag.ex_packet_type,
395            Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START) | Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_END)
396        );
397        (tag.body.clone(), header)
398    } else {
399        let mut data = Vec::with_capacity(tag.body.len() + 1);
400        if tag.sound_format == AUDIO_FORMAT_AAC {
401            data.push(tag.aac_packet_type.unwrap_or(flv::AAC_PACKET_TYPE_RAW));
402        }
403        data.extend_from_slice(&tag.body);
404        let header = tag.sound_format == AUDIO_FORMAT_AAC
405            && tag.aac_packet_type == Some(AAC_PACKET_TYPE_SEQUENCE_HEADER);
406        (data, header)
407    };
408    let flags = oxideav_core::packet::PacketFlags {
409        header: is_header,
410        ..Default::default()
411    };
412    Packet {
413        stream_index: AUDIO_STREAM_INDEX,
414        time_base: RTMP_TIME_BASE,
415        pts: Some(presentation_ns),
416        dts: Some(presentation_ns),
417        duration: None,
418        flags,
419        data,
420    }
421}
422
423/// Map an FLV video tag into a [`Packet`] on stream 1.
424///
425/// For AVC, `pts = timestamp + composition_time` and
426/// `dts = timestamp`. The 5-byte FLV/AVC header (frame-type +
427/// codec-id + AVC-packet-type + 24-bit composition_time) is
428/// stripped from `data`. Non-AVC video keeps its body as-is.
429/// The keyframe flag is propagated from the FLV frame-type
430/// nibble; sequence-header packets are flagged `header`.
431///
432/// `pts` and `dts` are emitted on the nanosecond
433/// [`RTMP_TIME_BASE`] timeline. The core RTMP millisecond
434/// `timestamp` is preserved verbatim as `dts =
435/// timestamp_ms * 1_000_000`; the per-message
436/// `TimestampOffsetNano` ModEx sum reported by
437/// [`VideoTag::timestamp_offset_nano`] is added to `pts` only —
438/// per `enhanced-rtmp-v2.pdf` the nanosecond offset adjusts the
439/// *presentation* time of the current media message without
440/// altering the core (decode) timestamp.
441pub fn video_to_packet(timestamp_ms: u32, tag: &VideoTag) -> Packet {
442    let dts_ns = (timestamp_ms as i64) * RTMP_MS_TO_NS;
443    // CTS lives in two places on the wire — AVC's 3-byte
444    // SI24 (legacy), and the three NALU-based Enhanced-RTMP
445    // FourCC variants paired with `CodedFrames`: HEVC (v1),
446    // AVC and VVC (added v2). `parse_video` normalises all of
447    // them into `tag.composition_time`; the non-NALU FourCCs
448    // (`av01`, `vp09`, `vp08`) and the SequenceStart /
449    // SequenceEnd / Metadata / CodedFramesX shapes leave it
450    // zero (per §"ExVideoTagBody" "compositionTimeOffset is
451    // implied to equal zero" — equivalent to "no offset").
452    let has_cts =
453        tag.codec_id == VIDEO_CODEC_AVC || (tag.fourcc.is_some() && tag.composition_time != 0);
454    let cts_ns = if has_cts {
455        (tag.composition_time as i64) * RTMP_MS_TO_NS
456    } else {
457        0
458    };
459    // ModEx `TimestampOffsetNano` (sub-millisecond, 0..=999_999 ns
460    // per spec but the typed accessor returns up to ~16 M as the
461    // raw bytesToUI24 sum across multiple entries) folds onto the
462    // presentation time only.
463    let nano_offset = tag.timestamp_offset_nano() as i64;
464    let pts_ns = dts_ns + cts_ns + nano_offset;
465    // `header` is set for *both* legacy AVC sequence headers and
466    // Enhanced-RTMP `PacketTypeSequenceStart` tags — downstream
467    // consumers can stash the body as `CodecParameters.extradata`
468    // regardless of codec.
469    let is_header = tag.is_avc_sequence_header() || tag.is_ex_sequence_header();
470    // `PacketTypeMetadata` (Enhanced RTMP `colorInfo` etc.) is
471    // not real frame data — surface it as a header-flagged
472    // packet so a downstream consumer can route it to the
473    // codec-parameters / HDR-metadata path instead of the
474    // decoder. Per spec, FrameType bits are ignored when this
475    // is set, so suppress the keyframe flag too.
476    let is_metadata = tag.is_ex_metadata();
477    let flags = oxideav_core::packet::PacketFlags {
478        keyframe: !is_metadata && tag.is_keyframe(),
479        header: is_header || is_metadata,
480        ..Default::default()
481    };
482    Packet {
483        stream_index: VIDEO_STREAM_INDEX,
484        time_base: RTMP_TIME_BASE,
485        pts: Some(pts_ns),
486        dts: Some(dts_ns),
487        duration: None,
488        flags,
489        data: tag.body.clone(),
490    }
491}
492
493/// Map an FLV `sound_format` to an oxideav [`CodecId`]. Returns
494/// `"unknown"` for codecs the workspace doesn't yet name —
495/// downstream the decoder factory will fail to find a match,
496/// which is the right outcome for "FLV says some legacy codec".
497/// Enhanced RTMP v2 FourCC tags go through
498/// [`audio_fourcc_codec_id`] / [`audio_codec_id_for_tag`]; this
499/// helper only handles the legacy single-byte SoundFormat
500/// nibble and returns `"unknown"` for the `ExHeader = 9`
501/// sentinel.
502pub fn audio_codec_id(sound_format: u8) -> CodecId {
503    let s = match sound_format {
504        AUDIO_FORMAT_PCM_LE => "pcm_s16le",
505        AUDIO_FORMAT_ADPCM => "adpcm_swf",
506        AUDIO_FORMAT_MP3 => "mp3",
507        AUDIO_FORMAT_PCM_LE_8BIT => "pcm_u8",
508        AUDIO_FORMAT_NELLYMOSER_16K_MONO => "nellymoser",
509        AUDIO_FORMAT_NELLYMOSER_8K_MONO => "nellymoser",
510        AUDIO_FORMAT_NELLYMOSER => "nellymoser",
511        AUDIO_FORMAT_G711_ALAW => "pcm_alaw",
512        AUDIO_FORMAT_G711_MULAW => "pcm_mulaw",
513        AUDIO_FORMAT_AAC => "aac",
514        AUDIO_FORMAT_SPEEX => "speex",
515        _ => "unknown",
516    };
517    CodecId::new(s)
518}
519
520/// Map an Enhanced RTMP v2 FourCC audio tag (`b"Opus"` /
521/// `b"fLaC"` / `b"ac-3"` / `b"ec-3"` / `b".mp3"` / `b"mp4a"`)
522/// to an oxideav [`CodecId`]. Unknown FourCCs collapse to
523/// `"unknown"`, matching the legacy [`audio_codec_id`] policy.
524pub fn audio_fourcc_codec_id(fourcc: [u8; 4]) -> CodecId {
525    let s = match &fourcc {
526        b"Opus" => "opus",
527        b"fLaC" => "flac",
528        b"ac-3" => "ac3",
529        b"ec-3" => "eac3",
530        b".mp3" => "mp3",
531        b"mp4a" => "aac",
532        _ => "unknown",
533    };
534    CodecId::new(s)
535}
536
537/// Dispatch [`audio_codec_id`] / [`audio_fourcc_codec_id`] off a
538/// parsed [`AudioTag`]: Enhanced RTMP v2 (FourCC) wins when set,
539/// otherwise the legacy single-byte `sound_format` is consulted.
540pub fn audio_codec_id_for_tag(tag: &AudioTag) -> CodecId {
541    if let Some(fcc) = tag.audio_fourcc {
542        audio_fourcc_codec_id(fcc)
543    } else {
544        audio_codec_id(tag.sound_format)
545    }
546}
547
548/// Map an FLV `codec_id` (low nibble of the first video-tag
549/// byte) to an oxideav [`CodecId`]. Legacy single-byte codec IDs
550/// only — Enhanced RTMP FourCCs go through
551/// [`video_codec_id_for_tag`].
552pub fn video_codec_id(codec_id: u8) -> CodecId {
553    let s = match codec_id {
554        VIDEO_CODEC_H263 => "h263",
555        VIDEO_CODEC_SCREEN => "flashsv",
556        VIDEO_CODEC_VP6 => "vp6f",
557        VIDEO_CODEC_VP6A => "vp6a",
558        VIDEO_CODEC_SCREEN_V2 => "flashsv2",
559        VIDEO_CODEC_AVC => "h264",
560        _ => "unknown",
561    };
562    CodecId::new(s)
563}
564
565/// Map an Enhanced-RTMP FourCC video tag to an oxideav
566/// [`CodecId`]. Covers the v1 set (`b"av01"` / `b"vp09"` /
567/// `b"hvc1"`) and the v2 additions (`b"vp08"` / `b"avc1"` /
568/// `b"vvc1"`). Unknown FourCCs (the spec leaves room for future
569/// codecs) collapse to `"unknown"`, matching the legacy
570/// [`video_codec_id`] policy.
571pub fn video_fourcc_codec_id(fourcc: [u8; 4]) -> CodecId {
572    let s = match &fourcc {
573        b"av01" => "av1",
574        b"vp09" => "vp9",
575        b"hvc1" => "hevc",
576        // Enhanced RTMP v2 (Veovera 2026) §"Enhanced Video".
577        b"vp08" => "vp8",
578        b"avc1" => "h264",
579        b"vvc1" => "vvc",
580        _ => "unknown",
581    };
582    CodecId::new(s)
583}
584
585/// Dispatch [`video_codec_id`] / [`video_fourcc_codec_id`] off a
586/// parsed [`VideoTag`]: Enhanced RTMP (FourCC) wins when set,
587/// otherwise the legacy single-byte `codec_id` is consulted.
588pub fn video_codec_id_for_tag(tag: &VideoTag) -> CodecId {
589    if let Some(fcc) = tag.fourcc {
590        video_fourcc_codec_id(fcc)
591    } else {
592        video_codec_id(tag.codec_id)
593    }
594}
595
596/// Build a [`CodecParameters`] for an audio stream from the
597/// first observed FLV audio tag. Sample-rate / channel-count
598/// hints from the tag header are populated when the FLV header
599/// bits are meaningful (per spec they aren't for AAC — the
600/// payload's `AudioSpecificConfig` carries the truth — so we
601/// leave those fields `None` for AAC and let the decoder fill
602/// them in).
603///
604/// For Enhanced RTMP v2 tags (`tag.audio_fourcc.is_some()`) the
605/// codec id is resolved through the FourCC dispatcher and the
606/// legacy SoundRate/Stereo hints are skipped (the spec mandates
607/// they're not interpreted in ExHeader mode). For
608/// `PacketTypeSequenceStart` we copy the codec's sequence-header
609/// body into `extradata` — that's `OpusHead` for Opus,
610/// `fLaC + STREAMINFO` for FLAC, and the AAC `AudioSpecificConfig`
611/// for FourCC-AAC. AC-3 / E-AC-3 / MP3 have no SequenceStart
612/// shape in v2, so `extradata` stays empty for them.
613fn audio_codec_params(tag: &AudioTag) -> CodecParameters {
614    let mut p = CodecParameters::audio(audio_codec_id_for_tag(tag));
615    if tag.audio_fourcc.is_some() {
616        if tag.ex_packet_type == Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START) {
617            p.extradata = tag.body.clone();
618        }
619        return p;
620    }
621    if tag.sound_format != AUDIO_FORMAT_AAC {
622        // FLV sound_rate: 0=5.5k 1=11k 2=22k 3=44.1k.
623        let rate = match tag.sound_rate {
624            0 => 5_512,
625            1 => 11_025,
626            2 => 22_050,
627            _ => 44_100,
628        };
629        p.sample_rate = Some(rate);
630        p.channels = Some(if tag.stereo { 2 } else { 1 });
631    }
632    if tag.sound_format == AUDIO_FORMAT_AAC
633        && tag.aac_packet_type == Some(AAC_PACKET_TYPE_SEQUENCE_HEADER)
634    {
635        p.extradata = tag.body.clone();
636    }
637    p
638}
639
640/// Build a [`CodecParameters`] for a video stream from the first
641/// observed FLV video tag. For AVC sequence headers we copy the
642/// `AVCDecoderConfigurationRecord` into `extradata` so a
643/// downstream H.264 decoder can find the SPS/PPS without
644/// re-parsing the packet.
645fn video_codec_params(tag: &VideoTag) -> CodecParameters {
646    let mut p = CodecParameters::video(video_codec_id_for_tag(tag));
647    // Legacy AVC: extradata is the `AVCDecoderConfigurationRecord`.
648    // Enhanced RTMP `PacketTypeSequenceStart`: extradata is the
649    // codec's configuration record per Table 4 — `HEVCDecoder
650    // ConfigurationRecord` for `hvc1`, `AV1CodecConfigurationRecord`
651    // for `av01`, `VPCodecConfigurationRecord` for `vp09`. In all
652    // three cases the body is exactly what a downstream
653    // ISO-BMFF-style decoder would expect to receive as
654    // extradata, so we copy it through unmodified.
655    if tag.is_avc_sequence_header() || tag.is_ex_sequence_header() {
656        p.extradata = tag.body.clone();
657    }
658    p
659}
660
661/// Convert a few well-known scalar fields out of an `onMetaData`
662/// AMF0 object into flat string pairs. Anything more elaborate is
663/// dropped — `metadata()` is best-effort, callers needing the full
664/// structure should use [`RtmpServer::accept`] directly.
665fn flatten_metadata(value: &Amf0Value, out: &mut Vec<(String, String)>) {
666    let pairs: &[(String, Amf0Value)] = match value {
667        Amf0Value::Object(p) => p.as_slice(),
668        Amf0Value::EcmaArray(p) => p.as_slice(),
669        _ => return,
670    };
671    for (k, v) in pairs {
672        let s = match v {
673            Amf0Value::Number(n) => format!("{n}"),
674            Amf0Value::Boolean(b) => b.to_string(),
675            Amf0Value::String(s) => s.clone(),
676            // Skip nested objects / arrays / null / undefined —
677            // the metadata() surface is intentionally flat.
678            _ => continue,
679        };
680        out.push((k.clone(), s));
681    }
682}
683
684/// Lift a crate-local [`crate::Error`] into the workspace
685/// [`oxideav_core::Error`]. Only the few variants we may
686/// surface during steady-state pumping are mapped specially —
687/// everything else collapses to `Error::Other`.
688pub(crate) fn rtmp_to_core_err(e: RtmpError) -> CoreError {
689    match e {
690        RtmpError::Io(io) => CoreError::Io(io),
691        RtmpError::UnexpectedEof => CoreError::Eof,
692        RtmpError::Timeout => CoreError::Other("rtmp: timeout".to_string()),
693        RtmpError::Rejected(r) => CoreError::Other(format!("rtmp: rejected: {r}")),
694        RtmpError::ProtocolViolation(m) => CoreError::InvalidData(format!("rtmp protocol: {m}")),
695        RtmpError::InvalidAmf0(m) => CoreError::InvalidData(format!("rtmp amf0: {m}")),
696        RtmpError::InvalidChunk(m) => CoreError::InvalidData(format!("rtmp chunk: {m}")),
697        RtmpError::InvalidCommand(m) => CoreError::InvalidData(format!("rtmp command: {m}")),
698        RtmpError::UnsupportedHandshakeVersion(v) => {
699            CoreError::Unsupported(format!("rtmp handshake version 0x{v:02x}"))
700        }
701        RtmpError::Other(m) => CoreError::Other(format!("rtmp: {m}")),
702    }
703}
704
705// ───────────────────────────── opener ─────────────────────────────
706
707/// Parsed view of an `rtmp://host:port/app[/stream_name]` URL,
708/// reused for the registry-listen flow. Mirrors
709/// [`crate::client::RtmpUrl`] but is parsed independently so we
710/// don't accidentally couple the client and adapter parsers.
711#[derive(Debug, Clone)]
712struct ListenUrl {
713    bind_addr: String,
714    expected_app: String,
715    expected_stream: String,
716}
717
718impl ListenUrl {
719    fn parse(uri: &str) -> CoreResult<Self> {
720        let s = uri
721            .strip_prefix("rtmp://")
722            .ok_or_else(|| CoreError::InvalidData(format!("not an rtmp:// URL: {uri}")))?;
723        let slash = s
724            .find('/')
725            .ok_or_else(|| CoreError::InvalidData(format!("rtmp URL missing /app: {uri}")))?;
726        let authority = &s[..slash];
727        let path = &s[slash + 1..];
728        let (host, port_str) = match authority.rsplit_once(':') {
729            Some((h, p)) => (h, p),
730            None => (authority, "1935"),
731        };
732        let port: u16 = port_str
733            .parse()
734            .map_err(|e| CoreError::InvalidData(format!("rtmp URL bad port {port_str:?}: {e}")))?;
735        let bind_host = if host.is_empty() { "0.0.0.0" } else { host };
736        let bind_addr = format!("{bind_host}:{port}");
737        let (app, stream_name) = match path.find('/') {
738            Some(i) => (path[..i].to_owned(), path[i + 1..].to_owned()),
739            None => (path.to_owned(), String::new()),
740        };
741        Ok(Self {
742            bind_addr,
743            expected_app: app,
744            expected_stream: stream_name,
745        })
746    }
747}
748
749/// `SourceRegistry` opener for the `rtmp://` scheme.
750///
751/// Listens on the URL's `host:port`, accepts the first incoming
752/// publisher, and returns it as a [`PacketSource`]. The
753/// publisher's announced `app` / `stream_name` must match the URL
754/// path or the publish is politely rejected and the listener
755/// closes (the opener returns [`CoreError::InvalidData`]).
756///
757/// Subsequent publishers cannot share the same opener call: each
758/// `SourceRegistry::open("rtmp://…")` is a one-shot accept. To
759/// service many publishers, use [`RtmpServer::serve`] directly.
760pub fn open_rtmp(uri: &str) -> CoreResult<Box<dyn PacketSource>> {
761    let url = ListenUrl::parse(uri)?;
762    // Resolve once — if a hostname doesn't resolve we want a
763    // clean error rather than a confused TcpListener::bind panic.
764    let resolved = url
765        .bind_addr
766        .to_socket_addrs()
767        .map_err(CoreError::Io)?
768        .next()
769        .ok_or_else(|| {
770            CoreError::InvalidData(format!("rtmp URL resolved no addresses: {}", url.bind_addr))
771        })?;
772    let server = RtmpServer::bind(resolved).map_err(rtmp_to_core_err)?;
773    let req = server.accept().map_err(rtmp_to_core_err)?;
774    if !url.expected_app.is_empty() && req.app != url.expected_app {
775        let actual = req.app.clone();
776        let expected = url.expected_app.clone();
777        let _ = req.reject("unexpected app");
778        return Err(CoreError::InvalidData(format!(
779            "rtmp publisher app mismatch: expected {expected:?}, got {actual:?}"
780        )));
781    }
782    if !url.expected_stream.is_empty() && req.stream_name != url.expected_stream {
783        let actual = req.stream_name.clone();
784        let expected = url.expected_stream.clone();
785        let _ = req.reject("unexpected stream key");
786        return Err(CoreError::InvalidData(format!(
787            "rtmp publisher stream-name mismatch: expected {expected:?}, got {actual:?}"
788        )));
789    }
790    let session = req.accept().map_err(rtmp_to_core_err)?;
791    let source = RtmpPacketSource::from_session_with_probe(session, Some(PROBE_READ_TIMEOUT))
792        .map_err(rtmp_to_core_err)?;
793    Ok(Box::new(source))
794}
795
796/// Install the `rtmp://` scheme on the given [`SourceRegistry`].
797/// Every URL of the form `rtmp://host:port/app/stream-name` opens
798/// a one-shot listener that accepts a single publisher and feeds
799/// its packets through the registry's [`PacketSource`] dispatch.
800///
801/// Idempotent: re-registering replaces the prior opener.
802pub fn register(registry: &mut SourceRegistry) {
803    registry.register_packets("rtmp", open_rtmp);
804}
805
806// Suppress dead_code on the BytesSource re-export — it's needed
807// only for documentation cross-references in this module's docs.
808#[allow(dead_code)]
809fn _bytes_source_anchor(_: Box<dyn BytesSource>) {}
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814    use crate::flv::{
815        AAC_PACKET_TYPE_RAW, AUDIO_FORMAT_EX_HEADER, AVC_PACKET_TYPE_NALU,
816        AVC_PACKET_TYPE_SEQUENCE_HEADER, EX_PACKET_TYPE_CODED_FRAMES, EX_PACKET_TYPE_METADATA,
817        EX_PACKET_TYPE_SEQUENCE_START, FOURCC_AV1, FOURCC_HEVC, FOURCC_VP9, VIDEO_FRAME_INTER,
818        VIDEO_FRAME_KEYFRAME,
819    };
820
821    #[test]
822    fn audio_codec_id_maps_aac_and_mp3() {
823        assert_eq!(audio_codec_id(AUDIO_FORMAT_AAC).as_str(), "aac");
824        assert_eq!(audio_codec_id(AUDIO_FORMAT_MP3).as_str(), "mp3");
825        assert_eq!(audio_codec_id(AUDIO_FORMAT_PCM_LE).as_str(), "pcm_s16le");
826        // Anything we don't model returns "unknown" — registry will
827        // surface the gap rather than silently mis-decode.
828        assert_eq!(audio_codec_id(0xFF).as_str(), "unknown");
829    }
830
831    #[test]
832    fn video_codec_id_maps_avc_and_h263() {
833        assert_eq!(video_codec_id(VIDEO_CODEC_AVC).as_str(), "h264");
834        assert_eq!(video_codec_id(VIDEO_CODEC_H263).as_str(), "h263");
835        assert_eq!(video_codec_id(VIDEO_CODEC_VP6).as_str(), "vp6f");
836        assert_eq!(video_codec_id(0xFF).as_str(), "unknown");
837    }
838
839    #[test]
840    fn audio_aac_seq_header_packet_carries_marker_and_header_flag() {
841        let tag = AudioTag {
842            mod_ex: Vec::new(),
843            sound_format: AUDIO_FORMAT_AAC,
844            sound_rate: 3,
845            sound_size_16bit: true,
846            stereo: true,
847            aac_packet_type: Some(AAC_PACKET_TYPE_SEQUENCE_HEADER),
848            body: vec![0x12, 0x10],
849            ex_packet_type: None,
850            audio_fourcc: None,
851
852            multitrack: None,
853        };
854        let pkt = audio_to_packet(0, &tag);
855        assert_eq!(pkt.stream_index, AUDIO_STREAM_INDEX);
856        assert_eq!(pkt.time_base, RTMP_TIME_BASE);
857        assert_eq!(pkt.pts, Some(0));
858        assert_eq!(pkt.dts, Some(0));
859        assert!(pkt.flags.header);
860        // packet type byte (0 = seq header) + body
861        assert_eq!(pkt.data, vec![0x00, 0x12, 0x10]);
862    }
863
864    #[test]
865    fn audio_aac_raw_packet_keeps_packet_type_byte() {
866        let tag = AudioTag {
867            mod_ex: Vec::new(),
868            sound_format: AUDIO_FORMAT_AAC,
869            sound_rate: 3,
870            sound_size_16bit: true,
871            stereo: true,
872            aac_packet_type: Some(AAC_PACKET_TYPE_RAW),
873            body: vec![0xAB, 0xCD, 0xEF],
874            ex_packet_type: None,
875            audio_fourcc: None,
876
877            multitrack: None,
878        };
879        let pkt = audio_to_packet(123, &tag);
880        // 123 ms → 123_000_000 ns on the RTMP_TIME_BASE timeline.
881        assert_eq!(pkt.pts, Some(123 * RTMP_MS_TO_NS));
882        assert_eq!(pkt.dts, Some(123 * RTMP_MS_TO_NS));
883        assert!(!pkt.flags.header);
884        assert_eq!(pkt.data, vec![0x01, 0xAB, 0xCD, 0xEF]);
885    }
886
887    #[test]
888    fn audio_mp3_packet_strips_flv_header_only() {
889        let tag = AudioTag {
890            mod_ex: Vec::new(),
891            sound_format: AUDIO_FORMAT_MP3,
892            sound_rate: 3,
893            sound_size_16bit: true,
894            stereo: true,
895            aac_packet_type: None,
896            body: vec![0xFF, 0xFB, 0x90, 0x00],
897            ex_packet_type: None,
898            audio_fourcc: None,
899
900            multitrack: None,
901        };
902        let pkt = audio_to_packet(40, &tag);
903        // No AAC marker prepended for non-AAC.
904        assert_eq!(pkt.data, vec![0xFF, 0xFB, 0x90, 0x00]);
905        assert_eq!(pkt.pts, Some(40 * RTMP_MS_TO_NS));
906    }
907
908    // ------- Enhanced RTMP v2 audio dispatch into Packet -------
909
910    #[test]
911    fn audio_codec_id_for_tag_dispatches_legacy_vs_fourcc() {
912        let legacy_aac = AudioTag {
913            mod_ex: Vec::new(),
914            sound_format: AUDIO_FORMAT_AAC,
915            sound_rate: 3,
916            sound_size_16bit: true,
917            stereo: true,
918            aac_packet_type: Some(AAC_PACKET_TYPE_SEQUENCE_HEADER),
919            body: vec![],
920            ex_packet_type: None,
921            audio_fourcc: None,
922
923            multitrack: None,
924        };
925        assert_eq!(audio_codec_id_for_tag(&legacy_aac).as_str(), "aac");
926        for (fcc, expected) in [
927            (flv::FOURCC_OPUS, "opus"),
928            (flv::FOURCC_FLAC, "flac"),
929            (flv::FOURCC_AC3, "ac3"),
930            (flv::FOURCC_EAC3, "eac3"),
931            (flv::FOURCC_MP3, "mp3"),
932            (flv::FOURCC_AAC, "aac"),
933        ] {
934            let t = AudioTag {
935                mod_ex: Vec::new(),
936                sound_format: AUDIO_FORMAT_EX_HEADER,
937                sound_rate: 0,
938                sound_size_16bit: false,
939                stereo: false,
940                aac_packet_type: None,
941                ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
942                audio_fourcc: Some(fcc),
943                body: vec![],
944
945                multitrack: None,
946            };
947            assert_eq!(audio_codec_id_for_tag(&t).as_str(), expected);
948        }
949    }
950
951    #[test]
952    fn audio_fourcc_codec_id_unknown_collapses() {
953        // Forward-compatible fallback for codecs the workspace
954        // doesn't yet name.
955        assert_eq!(audio_fourcc_codec_id(*b"xxxx").as_str(), "unknown");
956    }
957
958    #[test]
959    fn ex_opus_sequence_start_packet_sets_header_flag_and_strips_fourcc() {
960        let tag = AudioTag {
961            mod_ex: Vec::new(),
962            sound_format: AUDIO_FORMAT_EX_HEADER,
963            sound_rate: 0,
964            sound_size_16bit: false,
965            stereo: false,
966            aac_packet_type: None,
967            ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START),
968            audio_fourcc: Some(flv::FOURCC_OPUS),
969            body: b"OpusHead\x01\x02\x38\x01\x80\xbb\x00\x00\x00\x00\x00".to_vec(),
970
971            multitrack: None,
972        };
973        let pkt = audio_to_packet(0, &tag);
974        assert_eq!(pkt.stream_index, AUDIO_STREAM_INDEX);
975        assert!(pkt.flags.header);
976        // The Opus ID-header bytes pass through unchanged — no
977        // legacy AAC packet-type marker is prepended in Enhanced
978        // mode.
979        assert_eq!(
980            pkt.data,
981            b"OpusHead\x01\x02\x38\x01\x80\xbb\x00\x00\x00\x00\x00".to_vec()
982        );
983    }
984
985    #[test]
986    fn ex_ac3_coded_frames_packet_strips_fourcc_and_keeps_body() {
987        let tag = AudioTag {
988            mod_ex: Vec::new(),
989            sound_format: AUDIO_FORMAT_EX_HEADER,
990            sound_rate: 0,
991            sound_size_16bit: false,
992            stereo: false,
993            aac_packet_type: None,
994            ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
995            audio_fourcc: Some(flv::FOURCC_AC3),
996            body: vec![0x0B, 0x77, 0xAB, 0xCD, 0xEF],
997
998            multitrack: None,
999        };
1000        let pkt = audio_to_packet(200, &tag);
1001        assert!(!pkt.flags.header);
1002        assert_eq!(pkt.dts, Some(200 * RTMP_MS_TO_NS));
1003        assert_eq!(pkt.pts, Some(200 * RTMP_MS_TO_NS));
1004        // Raw AC-3 frame bytes — no marker, no header.
1005        assert_eq!(pkt.data, vec![0x0B, 0x77, 0xAB, 0xCD, 0xEF]);
1006    }
1007
1008    #[test]
1009    fn ex_audio_sequence_end_packet_flagged_header_with_empty_body() {
1010        let tag = AudioTag {
1011            mod_ex: Vec::new(),
1012            sound_format: AUDIO_FORMAT_EX_HEADER,
1013            sound_rate: 0,
1014            sound_size_16bit: false,
1015            stereo: false,
1016            aac_packet_type: None,
1017            ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_END),
1018            audio_fourcc: Some(flv::FOURCC_OPUS),
1019            body: vec![],
1020
1021            multitrack: None,
1022        };
1023        let pkt = audio_to_packet(999, &tag);
1024        // SequenceEnd is a flush boundary — header flag lets the
1025        // consumer route it without trying to decode an empty
1026        // frame.
1027        assert!(pkt.flags.header);
1028        assert!(pkt.data.is_empty());
1029        assert_eq!(pkt.dts, Some(999 * RTMP_MS_TO_NS));
1030    }
1031
1032    #[test]
1033    fn ex_audio_codec_params_copies_sequence_start_to_extradata() {
1034        let tag = AudioTag {
1035            mod_ex: Vec::new(),
1036            sound_format: AUDIO_FORMAT_EX_HEADER,
1037            sound_rate: 0,
1038            sound_size_16bit: false,
1039            stereo: false,
1040            aac_packet_type: None,
1041            ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START),
1042            audio_fourcc: Some(flv::FOURCC_FLAC),
1043            body: b"fLaC\x80\x00\x00\x22streaminfo-body-bytes".to_vec(),
1044
1045            multitrack: None,
1046        };
1047        let p = audio_codec_params(&tag);
1048        assert_eq!(p.codec_id.as_str(), "flac");
1049        assert_eq!(p.extradata, tag.body);
1050        // Legacy SoundRate/Stereo hints are skipped — the spec
1051        // says the bit-field "are not interpreted" in ExHeader
1052        // mode and codec-internal headers carry the truth.
1053        assert_eq!(p.sample_rate, None);
1054        assert_eq!(p.channels, None);
1055    }
1056
1057    #[test]
1058    fn ex_audio_codec_params_ac3_coded_frames_leaves_extradata_empty() {
1059        // AC-3 has no SequenceStart shape defined in v2 — only
1060        // CodedFrames carries data. Ensure `audio_codec_params`
1061        // doesn't accidentally treat a CodedFrames body as
1062        // extradata.
1063        let tag = AudioTag {
1064            mod_ex: Vec::new(),
1065            sound_format: AUDIO_FORMAT_EX_HEADER,
1066            sound_rate: 0,
1067            sound_size_16bit: false,
1068            stereo: false,
1069            aac_packet_type: None,
1070            ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
1071            audio_fourcc: Some(flv::FOURCC_AC3),
1072            body: vec![0x0B, 0x77, 0xAB, 0xCD],
1073
1074            multitrack: None,
1075        };
1076        let p = audio_codec_params(&tag);
1077        assert_eq!(p.codec_id.as_str(), "ac3");
1078        assert!(p.extradata.is_empty());
1079    }
1080
1081    #[test]
1082    fn video_avc_keyframe_packet_keyframe_flag_and_no_pts_offset() {
1083        let tag = VideoTag {
1084            mod_ex: Vec::new(),
1085            frame_type: VIDEO_FRAME_KEYFRAME,
1086            codec_id: VIDEO_CODEC_AVC,
1087            avc_packet_type: Some(AVC_PACKET_TYPE_NALU),
1088            composition_time: 0,
1089            body: b"\x00\x00\x00\x05hello".to_vec(),
1090            ex_packet_type: None,
1091            fourcc: None,
1092
1093            multitrack: None,
1094        };
1095        let pkt = video_to_packet(33, &tag);
1096        assert_eq!(pkt.stream_index, VIDEO_STREAM_INDEX);
1097        assert!(pkt.flags.keyframe);
1098        assert!(!pkt.flags.header);
1099        assert_eq!(pkt.pts, Some(33 * RTMP_MS_TO_NS));
1100        assert_eq!(pkt.dts, Some(33 * RTMP_MS_TO_NS));
1101        assert_eq!(pkt.data, b"\x00\x00\x00\x05hello".to_vec());
1102    }
1103
1104    #[test]
1105    fn video_avc_inter_packet_with_negative_cts_offsets_pts() {
1106        let tag = VideoTag {
1107            mod_ex: Vec::new(),
1108            frame_type: VIDEO_FRAME_INTER,
1109            codec_id: VIDEO_CODEC_AVC,
1110            avc_packet_type: Some(AVC_PACKET_TYPE_NALU),
1111            composition_time: -10,
1112            body: vec![1, 2, 3],
1113            ex_packet_type: None,
1114            fourcc: None,
1115
1116            multitrack: None,
1117        };
1118        let pkt = video_to_packet(100, &tag);
1119        assert!(!pkt.flags.keyframe);
1120        assert_eq!(pkt.dts, Some(100 * RTMP_MS_TO_NS));
1121        assert_eq!(pkt.pts, Some(90 * RTMP_MS_TO_NS));
1122    }
1123
1124    #[test]
1125    fn video_avc_seq_header_marks_header_flag() {
1126        let tag = VideoTag {
1127            mod_ex: Vec::new(),
1128            frame_type: VIDEO_FRAME_KEYFRAME,
1129            codec_id: VIDEO_CODEC_AVC,
1130            avc_packet_type: Some(AVC_PACKET_TYPE_SEQUENCE_HEADER),
1131            composition_time: 0,
1132            body: b"\x01\x42\x80\x1e".to_vec(),
1133            ex_packet_type: None,
1134            fourcc: None,
1135
1136            multitrack: None,
1137        };
1138        let pkt = video_to_packet(0, &tag);
1139        assert!(pkt.flags.keyframe);
1140        assert!(pkt.flags.header);
1141        assert_eq!(pkt.data, b"\x01\x42\x80\x1e".to_vec());
1142    }
1143
1144    #[test]
1145    fn video_h263_packet_keeps_body_and_pts_eq_dts() {
1146        let tag = VideoTag {
1147            mod_ex: Vec::new(),
1148            frame_type: VIDEO_FRAME_INTER,
1149            codec_id: VIDEO_CODEC_H263,
1150            avc_packet_type: None,
1151            composition_time: 0,
1152            body: vec![0xAA, 0xBB, 0xCC],
1153            ex_packet_type: None,
1154            fourcc: None,
1155
1156            multitrack: None,
1157        };
1158        let pkt = video_to_packet(50, &tag);
1159        assert_eq!(pkt.pts, pkt.dts);
1160        assert_eq!(pkt.data, vec![0xAA, 0xBB, 0xCC]);
1161    }
1162
1163    // ------- Enhanced RTMP v1 dispatch into Packet -------
1164
1165    #[test]
1166    fn video_codec_id_for_tag_dispatches_legacy_vs_fourcc() {
1167        let avc = VideoTag {
1168            mod_ex: Vec::new(),
1169            frame_type: VIDEO_FRAME_KEYFRAME,
1170            codec_id: VIDEO_CODEC_AVC,
1171            avc_packet_type: Some(AVC_PACKET_TYPE_NALU),
1172            composition_time: 0,
1173            body: vec![],
1174            ex_packet_type: None,
1175            fourcc: None,
1176
1177            multitrack: None,
1178        };
1179        assert_eq!(video_codec_id_for_tag(&avc).as_str(), "h264");
1180        for (fcc, expected) in [
1181            (FOURCC_HEVC, "hevc"),
1182            (FOURCC_AV1, "av1"),
1183            (FOURCC_VP9, "vp9"),
1184        ] {
1185            let t = VideoTag {
1186                mod_ex: Vec::new(),
1187                frame_type: VIDEO_FRAME_KEYFRAME,
1188                codec_id: 0,
1189                avc_packet_type: None,
1190                composition_time: 0,
1191                body: vec![],
1192                ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1193                fourcc: Some(fcc),
1194
1195                multitrack: None,
1196            };
1197            assert_eq!(video_codec_id_for_tag(&t).as_str(), expected);
1198        }
1199    }
1200
1201    #[test]
1202    fn ex_hevc_sequence_start_packet_sets_header_flag() {
1203        // Enhanced RTMP `PacketTypeSequenceStart` body is the
1204        // `HEVCDecoderConfigurationRecord`. We surface it just
1205        // like AVC's `avcC` — `flags.header == true`, body in
1206        // `pkt.data` for downstream extradata harvesting.
1207        let tag = VideoTag {
1208            mod_ex: Vec::new(),
1209            frame_type: VIDEO_FRAME_KEYFRAME,
1210            codec_id: 0,
1211            avc_packet_type: None,
1212            composition_time: 0,
1213            body: b"\x01hvcc-stub".to_vec(),
1214            ex_packet_type: Some(EX_PACKET_TYPE_SEQUENCE_START),
1215            fourcc: Some(FOURCC_HEVC),
1216
1217            multitrack: None,
1218        };
1219        let pkt = video_to_packet(0, &tag);
1220        assert!(pkt.flags.header);
1221        assert!(pkt.flags.keyframe);
1222        assert_eq!(pkt.dts, Some(0));
1223        assert_eq!(pkt.pts, Some(0));
1224        assert_eq!(pkt.time_base, RTMP_TIME_BASE);
1225        assert_eq!(pkt.data, b"\x01hvcc-stub".to_vec());
1226    }
1227
1228    #[test]
1229    fn ex_hevc_coded_frames_with_cts_offsets_pts() {
1230        // Only HEVC × CodedFrames carries CTS on the wire in
1231        // Enhanced RTMP — exercise the CTS-propagation branch.
1232        let tag = VideoTag {
1233            mod_ex: Vec::new(),
1234            frame_type: VIDEO_FRAME_INTER,
1235            codec_id: 0,
1236            avc_packet_type: None,
1237            composition_time: 17,
1238            body: b"\x00\x00\x00\x04NALU".to_vec(),
1239            ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1240            fourcc: Some(FOURCC_HEVC),
1241
1242            multitrack: None,
1243        };
1244        let pkt = video_to_packet(200, &tag);
1245        assert!(!pkt.flags.keyframe);
1246        assert!(!pkt.flags.header);
1247        assert_eq!(pkt.dts, Some(200 * RTMP_MS_TO_NS));
1248        assert_eq!(pkt.pts, Some(217 * RTMP_MS_TO_NS));
1249    }
1250
1251    #[test]
1252    fn ex_av1_coded_frames_no_cts_offset() {
1253        // AV1 / VP9 leave CTS implied-zero — `pts == dts`.
1254        let tag = VideoTag {
1255            mod_ex: Vec::new(),
1256            frame_type: VIDEO_FRAME_KEYFRAME,
1257            codec_id: 0,
1258            avc_packet_type: None,
1259            composition_time: 0,
1260            body: vec![0x0a, 0x0b],
1261            ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1262            fourcc: Some(FOURCC_AV1),
1263
1264            multitrack: None,
1265        };
1266        let pkt = video_to_packet(500, &tag);
1267        assert!(pkt.flags.keyframe);
1268        assert!(!pkt.flags.header);
1269        assert_eq!(pkt.dts, pkt.pts);
1270        assert_eq!(pkt.dts, Some(500 * RTMP_MS_TO_NS));
1271    }
1272
1273    #[test]
1274    fn ex_metadata_packet_ignores_frame_type_flags() {
1275        // Per spec the FrameType bits MUST be ignored for
1276        // PacketTypeMetadata; we suppress `keyframe` and set
1277        // `header` so the consumer routes it to its sideband
1278        // (HDR `colorInfo` etc.) instead of the decoder.
1279        let tag = VideoTag {
1280            mod_ex: Vec::new(),
1281            frame_type: VIDEO_FRAME_KEYFRAME, // would normally → keyframe = true
1282            codec_id: 0,
1283            avc_packet_type: None,
1284            composition_time: 0,
1285            body: b"amf-payload".to_vec(),
1286            ex_packet_type: Some(EX_PACKET_TYPE_METADATA),
1287            fourcc: Some(FOURCC_HEVC),
1288
1289            multitrack: None,
1290        };
1291        let pkt = video_to_packet(123, &tag);
1292        assert!(!pkt.flags.keyframe);
1293        assert!(pkt.flags.header);
1294        assert_eq!(pkt.data, b"amf-payload".to_vec());
1295    }
1296
1297    // ------- TimestampOffsetNano fold into the ns Packet timeline -------
1298
1299    #[test]
1300    fn audio_timestamp_offset_nano_folds_into_presentation_time() {
1301        // `enhanced-rtmp-v2.pdf` §"ExAudioTagHeader" defines a
1302        // `TimestampOffsetNano` ModEx subtype carrying a 0..=999_999
1303        // ns offset added to the *presentation* time of the current
1304        // media message without altering the core RTMP timestamp.
1305        // For audio pts == dts (no separate decode time) so the
1306        // offset rides on both.
1307        let tag = AudioTag {
1308            mod_ex: vec![crate::flv::ModEx::timestamp_offset_nano_entry(750_000)],
1309            sound_format: AUDIO_FORMAT_EX_HEADER,
1310            sound_rate: 0,
1311            sound_size_16bit: false,
1312            stereo: false,
1313            aac_packet_type: None,
1314            ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
1315            audio_fourcc: Some(flv::FOURCC_OPUS),
1316            body: vec![0x12, 0x34, 0x56],
1317
1318            multitrack: None,
1319        };
1320        let pkt = audio_to_packet(40, &tag);
1321        // 40 ms * 1e6 ns/ms + 750_000 ns = 40_750_000 ns.
1322        assert_eq!(pkt.pts, Some(40_750_000));
1323        assert_eq!(pkt.dts, Some(40_750_000));
1324        assert_eq!(pkt.time_base, RTMP_TIME_BASE);
1325    }
1326
1327    #[test]
1328    fn video_timestamp_offset_nano_folds_into_pts_only() {
1329        // Per spec the nanosecond offset adjusts the presentation
1330        // time; for video that's PTS. DTS (core decode timestamp)
1331        // is preserved as the raw ms value scaled to ns.
1332        let tag = VideoTag {
1333            mod_ex: vec![crate::flv::ModEx::timestamp_offset_nano_entry(123_456)],
1334            frame_type: VIDEO_FRAME_INTER,
1335            codec_id: 0,
1336            avc_packet_type: None,
1337            composition_time: 0,
1338            body: b"\x00\x00\x00\x04NALU".to_vec(),
1339            ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1340            fourcc: Some(FOURCC_AV1),
1341
1342            multitrack: None,
1343        };
1344        let pkt = video_to_packet(60, &tag);
1345        // DTS stays on the raw ms grid (in ns units) — 60 ms = 60_000_000 ns.
1346        assert_eq!(pkt.dts, Some(60_000_000));
1347        // PTS = DTS + nano_offset (AV1 carries no CTS).
1348        assert_eq!(pkt.pts, Some(60_123_456));
1349        assert_eq!(pkt.time_base, RTMP_TIME_BASE);
1350    }
1351
1352    #[test]
1353    fn video_timestamp_offset_nano_stacks_on_cts_and_dts_unchanged() {
1354        // HEVC × CodedFrames pair carries CTS on the wire — make
1355        // sure the ns offset stacks on top of (CTS * 1e6) without
1356        // perturbing DTS.
1357        let tag = VideoTag {
1358            mod_ex: vec![crate::flv::ModEx::timestamp_offset_nano_entry(500_000)],
1359            frame_type: VIDEO_FRAME_INTER,
1360            codec_id: 0,
1361            avc_packet_type: None,
1362            composition_time: 17,
1363            body: b"\x00\x00\x00\x04NALU".to_vec(),
1364            ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1365            fourcc: Some(FOURCC_HEVC),
1366
1367            multitrack: None,
1368        };
1369        let pkt = video_to_packet(200, &tag);
1370        // DTS = 200 ms * 1e6 ns/ms — no offset.
1371        assert_eq!(pkt.dts, Some(200_000_000));
1372        // PTS = (200 + 17) ms * 1e6 + 500_000 ns = 217_500_000 ns.
1373        assert_eq!(pkt.pts, Some(217_500_000));
1374    }
1375
1376    #[test]
1377    fn video_timestamp_offset_nano_sums_multiple_modex_entries() {
1378        // The accessor sums every `TimestampOffsetNano` entry in
1379        // the chain. A non-`TimestampOffsetNano` entry in the middle
1380        // does not perturb the sum.
1381        let tag = VideoTag {
1382            mod_ex: vec![
1383                crate::flv::ModEx::timestamp_offset_nano_entry(200_000),
1384                // Unknown / reserved ModEx type — must not feed the sum.
1385                crate::flv::ModEx {
1386                    mod_ex_type: 0x0F,
1387                    data: vec![0xAA, 0xBB, 0xCC],
1388                },
1389                crate::flv::ModEx::timestamp_offset_nano_entry(300_000),
1390            ],
1391            frame_type: VIDEO_FRAME_KEYFRAME,
1392            codec_id: 0,
1393            avc_packet_type: None,
1394            composition_time: 0,
1395            body: vec![0x0A],
1396            ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1397            fourcc: Some(FOURCC_VP9),
1398
1399            multitrack: None,
1400        };
1401        let pkt = video_to_packet(10, &tag);
1402        // 10 ms * 1e6 + (200_000 + 300_000) ns = 10_500_000 ns.
1403        assert_eq!(pkt.pts, Some(10_500_000));
1404        assert_eq!(pkt.dts, Some(10_000_000));
1405    }
1406
1407    #[test]
1408    fn time_base_is_nanoseconds() {
1409        // The whole RTMP adapter timeline is 1/1_000_000_000.
1410        assert_eq!(RTMP_TIME_BASE, TimeBase::new(1, 1_000_000_000));
1411        assert_eq!(RTMP_MS_TO_NS, 1_000_000);
1412    }
1413
1414    #[test]
1415    fn legacy_avc_seq_header_constant_still_referenced() {
1416        // Sanity test ensures the `AVC_PACKET_TYPE_SEQUENCE_HEADER`
1417        // import isn't dropped by a future refactor — the symbol
1418        // is part of the public re-export chain for downstream
1419        // codec adapters that hand-craft VideoTag literals.
1420        let _ = AVC_PACKET_TYPE_SEQUENCE_HEADER;
1421    }
1422
1423    #[test]
1424    fn listen_url_parses_host_port_app_key() {
1425        let u = ListenUrl::parse("rtmp://127.0.0.1:1935/live/secret").expect("parse");
1426        assert_eq!(u.bind_addr, "127.0.0.1:1935");
1427        assert_eq!(u.expected_app, "live");
1428        assert_eq!(u.expected_stream, "secret");
1429    }
1430
1431    #[test]
1432    fn listen_url_default_port_is_1935() {
1433        let u = ListenUrl::parse("rtmp://0.0.0.0/live/key").expect("parse");
1434        assert_eq!(u.bind_addr, "0.0.0.0:1935");
1435    }
1436
1437    #[test]
1438    fn listen_url_accepts_app_only_path() {
1439        let u = ListenUrl::parse("rtmp://127.0.0.1:1935/live").expect("parse");
1440        assert_eq!(u.expected_app, "live");
1441        assert_eq!(u.expected_stream, "");
1442    }
1443
1444    #[test]
1445    fn listen_url_rejects_non_rtmp_scheme() {
1446        assert!(ListenUrl::parse("http://x/y").is_err());
1447    }
1448
1449    #[test]
1450    fn listen_url_rejects_missing_path() {
1451        assert!(ListenUrl::parse("rtmp://127.0.0.1:1935").is_err());
1452    }
1453
1454    #[test]
1455    fn flatten_metadata_keeps_scalars_and_drops_objects() {
1456        let v = Amf0Value::Object(vec![
1457            ("width".into(), Amf0Value::Number(1280.0)),
1458            ("height".into(), Amf0Value::Number(720.0)),
1459            ("encoder".into(), Amf0Value::String("oxideav".into())),
1460            ("vhost".into(), Amf0Value::Object(vec![])),
1461            ("live".into(), Amf0Value::Boolean(true)),
1462        ]);
1463        let mut out = Vec::new();
1464        flatten_metadata(&v, &mut out);
1465        assert_eq!(
1466            out,
1467            vec![
1468                ("width".to_string(), "1280".to_string()),
1469                ("height".to_string(), "720".to_string()),
1470                ("encoder".to_string(), "oxideav".to_string()),
1471                // "vhost" object dropped.
1472                ("live".to_string(), "true".to_string()),
1473            ]
1474        );
1475    }
1476
1477    #[test]
1478    fn rtmp_to_core_err_maps_unexpected_eof_to_eof() {
1479        let core = rtmp_to_core_err(RtmpError::UnexpectedEof);
1480        assert!(matches!(core, CoreError::Eof));
1481    }
1482
1483    #[test]
1484    fn rtmp_to_core_err_maps_protocol_violation_to_invalid_data() {
1485        let core = rtmp_to_core_err(RtmpError::ProtocolViolation("bad chunk size".into()));
1486        match core {
1487            CoreError::InvalidData(s) => assert!(s.contains("bad chunk size")),
1488            _ => panic!("expected InvalidData"),
1489        }
1490    }
1491}