use std::collections::VecDeque;
use std::net::ToSocketAddrs;
use std::time::Duration;
use oxideav_core::{
BytesSource, CodecId, CodecParameters, Error as CoreError, Packet, PacketSource,
Result as CoreResult, SourceRegistry, StreamInfo, TimeBase,
};
use crate::amf::Amf0Value;
use crate::error::{Error as RtmpError, Result as RtmpResult};
use crate::flv::{
self, AudioTag, VideoTag, AAC_PACKET_TYPE_SEQUENCE_HEADER, AUDIO_FORMAT_AAC,
AUDIO_FORMAT_ADPCM, AUDIO_FORMAT_G711_ALAW, AUDIO_FORMAT_G711_MULAW, AUDIO_FORMAT_MP3,
AUDIO_FORMAT_NELLYMOSER, AUDIO_FORMAT_NELLYMOSER_16K_MONO, AUDIO_FORMAT_NELLYMOSER_8K_MONO,
AUDIO_FORMAT_PCM_LE, AUDIO_FORMAT_PCM_LE_8BIT, AUDIO_FORMAT_SPEEX, VIDEO_CODEC_AVC,
VIDEO_CODEC_H263, VIDEO_CODEC_SCREEN, VIDEO_CODEC_SCREEN_V2, VIDEO_CODEC_VP6, VIDEO_CODEC_VP6A,
};
use crate::server::{RtmpServer, RtmpSession, StreamPacket};
pub const AUDIO_STREAM_INDEX: u32 = 0;
pub const VIDEO_STREAM_INDEX: u32 = 1;
pub const RTMP_TIME_BASE: TimeBase = TimeBase::new(1, 1000);
pub const PROBE_LIMIT: usize = 32;
pub const PROBE_READ_TIMEOUT: Duration = Duration::from_secs(10);
struct BufferedPacket {
packet: Packet,
#[allow(dead_code)]
is_audio: bool,
}
pub struct RtmpPacketSource {
session: RtmpSession,
streams: Vec<StreamInfo>,
metadata: Vec<(String, String)>,
buffered: VecDeque<BufferedPacket>,
ended: bool,
}
impl RtmpPacketSource {
pub fn from_session(session: RtmpSession) -> Self {
Self {
session,
streams: Vec::new(),
metadata: Vec::new(),
buffered: VecDeque::new(),
ended: false,
}
}
pub fn from_session_with_probe(
mut session: RtmpSession,
read_timeout: Option<Duration>,
) -> RtmpResult<Self> {
if let Some(d) = read_timeout {
let _ = session.set_read_timeout(Some(d));
}
let mut streams: Vec<StreamInfo> = Vec::new();
let mut metadata: Vec<(String, String)> = Vec::new();
let mut buffered: VecDeque<BufferedPacket> = VecDeque::new();
let mut have_audio = false;
let mut have_video = false;
let mut ended = false;
for _ in 0..PROBE_LIMIT {
if have_audio && have_video {
break;
}
let next = match session.next_packet() {
Ok(Some(p)) => p,
Ok(None) => {
ended = true;
break;
}
Err(RtmpError::Io(e))
if matches!(
e.kind(),
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
) =>
{
break;
}
Err(e) => return Err(e),
};
match next {
StreamPacket::Audio { timestamp, tag } => {
if !have_audio {
let params = audio_codec_params(&tag);
streams.push(StreamInfo {
index: AUDIO_STREAM_INDEX,
time_base: RTMP_TIME_BASE,
duration: None,
start_time: None,
params,
});
have_audio = true;
}
let pkt = audio_to_packet(timestamp, &tag);
buffered.push_back(BufferedPacket {
packet: pkt,
is_audio: true,
});
}
StreamPacket::Video { timestamp, tag } => {
if !have_video {
let params = video_codec_params(&tag);
streams.push(StreamInfo {
index: VIDEO_STREAM_INDEX,
time_base: RTMP_TIME_BASE,
duration: None,
start_time: None,
params,
});
have_video = true;
}
let pkt = video_to_packet(timestamp, &tag);
buffered.push_back(BufferedPacket {
packet: pkt,
is_audio: false,
});
}
StreamPacket::Metadata(value) => {
flatten_metadata(&value, &mut metadata);
}
}
}
let _ = session.set_read_timeout(None);
streams.sort_by_key(|s| s.index);
Ok(Self {
session,
streams,
metadata,
buffered,
ended,
})
}
pub fn session(&self) -> &RtmpSession {
&self.session
}
}
impl PacketSource for RtmpPacketSource {
fn streams(&self) -> &[StreamInfo] {
&self.streams
}
fn next_packet(&mut self) -> CoreResult<Packet> {
if let Some(buf) = self.buffered.pop_front() {
return Ok(buf.packet);
}
if self.ended {
return Err(CoreError::Eof);
}
loop {
let event = self.session.next_packet().map_err(rtmp_to_core_err)?;
match event {
Some(StreamPacket::Audio { timestamp, tag }) => {
if self.streams.iter().all(|s| s.index != AUDIO_STREAM_INDEX) {
let params = audio_codec_params(&tag);
self.streams.push(StreamInfo {
index: AUDIO_STREAM_INDEX,
time_base: RTMP_TIME_BASE,
duration: None,
start_time: None,
params,
});
self.streams.sort_by_key(|s| s.index);
}
return Ok(audio_to_packet(timestamp, &tag));
}
Some(StreamPacket::Video { timestamp, tag }) => {
if self.streams.iter().all(|s| s.index != VIDEO_STREAM_INDEX) {
let params = video_codec_params(&tag);
self.streams.push(StreamInfo {
index: VIDEO_STREAM_INDEX,
time_base: RTMP_TIME_BASE,
duration: None,
start_time: None,
params,
});
self.streams.sort_by_key(|s| s.index);
}
return Ok(video_to_packet(timestamp, &tag));
}
Some(StreamPacket::Metadata(value)) => {
flatten_metadata(&value, &mut self.metadata);
continue;
}
None => {
self.ended = true;
return Err(CoreError::Eof);
}
}
}
}
fn metadata(&self) -> &[(String, String)] {
&self.metadata
}
fn duration_micros(&self) -> Option<i64> {
None
}
}
pub fn audio_to_packet(timestamp_ms: u32, tag: &AudioTag) -> Packet {
let ts = timestamp_ms as i64;
let (data, is_header) = if tag.audio_fourcc.is_some() {
let header = matches!(
tag.ex_packet_type,
Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START) | Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_END)
);
(tag.body.clone(), header)
} else {
let mut data = Vec::with_capacity(tag.body.len() + 1);
if tag.sound_format == AUDIO_FORMAT_AAC {
data.push(tag.aac_packet_type.unwrap_or(flv::AAC_PACKET_TYPE_RAW));
}
data.extend_from_slice(&tag.body);
let header = tag.sound_format == AUDIO_FORMAT_AAC
&& tag.aac_packet_type == Some(AAC_PACKET_TYPE_SEQUENCE_HEADER);
(data, header)
};
let flags = oxideav_core::packet::PacketFlags {
header: is_header,
..Default::default()
};
Packet {
stream_index: AUDIO_STREAM_INDEX,
time_base: RTMP_TIME_BASE,
pts: Some(ts),
dts: Some(ts),
duration: None,
flags,
data,
}
}
pub fn video_to_packet(timestamp_ms: u32, tag: &VideoTag) -> Packet {
let dts = timestamp_ms as i64;
let has_cts =
tag.codec_id == VIDEO_CODEC_AVC || (tag.fourcc.is_some() && tag.composition_time != 0);
let pts = if has_cts {
dts + tag.composition_time as i64
} else {
dts
};
let is_header = tag.is_avc_sequence_header() || tag.is_ex_sequence_header();
let is_metadata = tag.is_ex_metadata();
let flags = oxideav_core::packet::PacketFlags {
keyframe: !is_metadata && tag.is_keyframe(),
header: is_header || is_metadata,
..Default::default()
};
Packet {
stream_index: VIDEO_STREAM_INDEX,
time_base: RTMP_TIME_BASE,
pts: Some(pts),
dts: Some(dts),
duration: None,
flags,
data: tag.body.clone(),
}
}
pub fn audio_codec_id(sound_format: u8) -> CodecId {
let s = match sound_format {
AUDIO_FORMAT_PCM_LE => "pcm_s16le",
AUDIO_FORMAT_ADPCM => "adpcm_swf",
AUDIO_FORMAT_MP3 => "mp3",
AUDIO_FORMAT_PCM_LE_8BIT => "pcm_u8",
AUDIO_FORMAT_NELLYMOSER_16K_MONO => "nellymoser",
AUDIO_FORMAT_NELLYMOSER_8K_MONO => "nellymoser",
AUDIO_FORMAT_NELLYMOSER => "nellymoser",
AUDIO_FORMAT_G711_ALAW => "pcm_alaw",
AUDIO_FORMAT_G711_MULAW => "pcm_mulaw",
AUDIO_FORMAT_AAC => "aac",
AUDIO_FORMAT_SPEEX => "speex",
_ => "unknown",
};
CodecId::new(s)
}
pub fn audio_fourcc_codec_id(fourcc: [u8; 4]) -> CodecId {
let s = match &fourcc {
b"Opus" => "opus",
b"fLaC" => "flac",
b"ac-3" => "ac3",
b"ec-3" => "eac3",
b".mp3" => "mp3",
b"mp4a" => "aac",
_ => "unknown",
};
CodecId::new(s)
}
pub fn audio_codec_id_for_tag(tag: &AudioTag) -> CodecId {
if let Some(fcc) = tag.audio_fourcc {
audio_fourcc_codec_id(fcc)
} else {
audio_codec_id(tag.sound_format)
}
}
pub fn video_codec_id(codec_id: u8) -> CodecId {
let s = match codec_id {
VIDEO_CODEC_H263 => "h263",
VIDEO_CODEC_SCREEN => "flashsv",
VIDEO_CODEC_VP6 => "vp6f",
VIDEO_CODEC_VP6A => "vp6a",
VIDEO_CODEC_SCREEN_V2 => "flashsv2",
VIDEO_CODEC_AVC => "h264",
_ => "unknown",
};
CodecId::new(s)
}
pub fn video_fourcc_codec_id(fourcc: [u8; 4]) -> CodecId {
let s = match &fourcc {
b"av01" => "av1",
b"vp09" => "vp9",
b"hvc1" => "hevc",
b"vp08" => "vp8",
b"avc1" => "h264",
b"vvc1" => "vvc",
_ => "unknown",
};
CodecId::new(s)
}
pub fn video_codec_id_for_tag(tag: &VideoTag) -> CodecId {
if let Some(fcc) = tag.fourcc {
video_fourcc_codec_id(fcc)
} else {
video_codec_id(tag.codec_id)
}
}
fn audio_codec_params(tag: &AudioTag) -> CodecParameters {
let mut p = CodecParameters::audio(audio_codec_id_for_tag(tag));
if tag.audio_fourcc.is_some() {
if tag.ex_packet_type == Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START) {
p.extradata = tag.body.clone();
}
return p;
}
if tag.sound_format != AUDIO_FORMAT_AAC {
let rate = match tag.sound_rate {
0 => 5_512,
1 => 11_025,
2 => 22_050,
_ => 44_100,
};
p.sample_rate = Some(rate);
p.channels = Some(if tag.stereo { 2 } else { 1 });
}
if tag.sound_format == AUDIO_FORMAT_AAC
&& tag.aac_packet_type == Some(AAC_PACKET_TYPE_SEQUENCE_HEADER)
{
p.extradata = tag.body.clone();
}
p
}
fn video_codec_params(tag: &VideoTag) -> CodecParameters {
let mut p = CodecParameters::video(video_codec_id_for_tag(tag));
if tag.is_avc_sequence_header() || tag.is_ex_sequence_header() {
p.extradata = tag.body.clone();
}
p
}
fn flatten_metadata(value: &Amf0Value, out: &mut Vec<(String, String)>) {
let pairs: &[(String, Amf0Value)] = match value {
Amf0Value::Object(p) => p.as_slice(),
Amf0Value::EcmaArray(p) => p.as_slice(),
_ => return,
};
for (k, v) in pairs {
let s = match v {
Amf0Value::Number(n) => format!("{n}"),
Amf0Value::Boolean(b) => b.to_string(),
Amf0Value::String(s) => s.clone(),
_ => continue,
};
out.push((k.clone(), s));
}
}
pub(crate) fn rtmp_to_core_err(e: RtmpError) -> CoreError {
match e {
RtmpError::Io(io) => CoreError::Io(io),
RtmpError::UnexpectedEof => CoreError::Eof,
RtmpError::Timeout => CoreError::Other("rtmp: timeout".to_string()),
RtmpError::Rejected(r) => CoreError::Other(format!("rtmp: rejected: {r}")),
RtmpError::ProtocolViolation(m) => CoreError::InvalidData(format!("rtmp protocol: {m}")),
RtmpError::InvalidAmf0(m) => CoreError::InvalidData(format!("rtmp amf0: {m}")),
RtmpError::InvalidChunk(m) => CoreError::InvalidData(format!("rtmp chunk: {m}")),
RtmpError::InvalidCommand(m) => CoreError::InvalidData(format!("rtmp command: {m}")),
RtmpError::UnsupportedHandshakeVersion(v) => {
CoreError::Unsupported(format!("rtmp handshake version 0x{v:02x}"))
}
RtmpError::Other(m) => CoreError::Other(format!("rtmp: {m}")),
}
}
#[derive(Debug, Clone)]
struct ListenUrl {
bind_addr: String,
expected_app: String,
expected_stream: String,
}
impl ListenUrl {
fn parse(uri: &str) -> CoreResult<Self> {
let s = uri
.strip_prefix("rtmp://")
.ok_or_else(|| CoreError::InvalidData(format!("not an rtmp:// URL: {uri}")))?;
let slash = s
.find('/')
.ok_or_else(|| CoreError::InvalidData(format!("rtmp URL missing /app: {uri}")))?;
let authority = &s[..slash];
let path = &s[slash + 1..];
let (host, port_str) = match authority.rsplit_once(':') {
Some((h, p)) => (h, p),
None => (authority, "1935"),
};
let port: u16 = port_str
.parse()
.map_err(|e| CoreError::InvalidData(format!("rtmp URL bad port {port_str:?}: {e}")))?;
let bind_host = if host.is_empty() { "0.0.0.0" } else { host };
let bind_addr = format!("{bind_host}:{port}");
let (app, stream_name) = match path.find('/') {
Some(i) => (path[..i].to_owned(), path[i + 1..].to_owned()),
None => (path.to_owned(), String::new()),
};
Ok(Self {
bind_addr,
expected_app: app,
expected_stream: stream_name,
})
}
}
pub fn open_rtmp(uri: &str) -> CoreResult<Box<dyn PacketSource>> {
let url = ListenUrl::parse(uri)?;
let resolved = url
.bind_addr
.to_socket_addrs()
.map_err(CoreError::Io)?
.next()
.ok_or_else(|| {
CoreError::InvalidData(format!("rtmp URL resolved no addresses: {}", url.bind_addr))
})?;
let server = RtmpServer::bind(resolved).map_err(rtmp_to_core_err)?;
let req = server.accept().map_err(rtmp_to_core_err)?;
if !url.expected_app.is_empty() && req.app != url.expected_app {
let actual = req.app.clone();
let expected = url.expected_app.clone();
let _ = req.reject("unexpected app");
return Err(CoreError::InvalidData(format!(
"rtmp publisher app mismatch: expected {expected:?}, got {actual:?}"
)));
}
if !url.expected_stream.is_empty() && req.stream_name != url.expected_stream {
let actual = req.stream_name.clone();
let expected = url.expected_stream.clone();
let _ = req.reject("unexpected stream key");
return Err(CoreError::InvalidData(format!(
"rtmp publisher stream-name mismatch: expected {expected:?}, got {actual:?}"
)));
}
let session = req.accept().map_err(rtmp_to_core_err)?;
let source = RtmpPacketSource::from_session_with_probe(session, Some(PROBE_READ_TIMEOUT))
.map_err(rtmp_to_core_err)?;
Ok(Box::new(source))
}
pub fn register(registry: &mut SourceRegistry) {
registry.register_packets("rtmp", open_rtmp);
}
#[allow(dead_code)]
fn _bytes_source_anchor(_: Box<dyn BytesSource>) {}
#[cfg(test)]
mod tests {
use super::*;
use crate::flv::{
AAC_PACKET_TYPE_RAW, AUDIO_FORMAT_EX_HEADER, AVC_PACKET_TYPE_NALU,
AVC_PACKET_TYPE_SEQUENCE_HEADER, EX_PACKET_TYPE_CODED_FRAMES, EX_PACKET_TYPE_METADATA,
EX_PACKET_TYPE_SEQUENCE_START, FOURCC_AV1, FOURCC_HEVC, FOURCC_VP9, VIDEO_FRAME_INTER,
VIDEO_FRAME_KEYFRAME,
};
#[test]
fn audio_codec_id_maps_aac_and_mp3() {
assert_eq!(audio_codec_id(AUDIO_FORMAT_AAC).as_str(), "aac");
assert_eq!(audio_codec_id(AUDIO_FORMAT_MP3).as_str(), "mp3");
assert_eq!(audio_codec_id(AUDIO_FORMAT_PCM_LE).as_str(), "pcm_s16le");
assert_eq!(audio_codec_id(0xFF).as_str(), "unknown");
}
#[test]
fn video_codec_id_maps_avc_and_h263() {
assert_eq!(video_codec_id(VIDEO_CODEC_AVC).as_str(), "h264");
assert_eq!(video_codec_id(VIDEO_CODEC_H263).as_str(), "h263");
assert_eq!(video_codec_id(VIDEO_CODEC_VP6).as_str(), "vp6f");
assert_eq!(video_codec_id(0xFF).as_str(), "unknown");
}
#[test]
fn audio_aac_seq_header_packet_carries_marker_and_header_flag() {
let tag = AudioTag {
mod_ex: Vec::new(),
sound_format: AUDIO_FORMAT_AAC,
sound_rate: 3,
sound_size_16bit: true,
stereo: true,
aac_packet_type: Some(AAC_PACKET_TYPE_SEQUENCE_HEADER),
body: vec![0x12, 0x10],
ex_packet_type: None,
audio_fourcc: None,
multitrack: None,
};
let pkt = audio_to_packet(0, &tag);
assert_eq!(pkt.stream_index, AUDIO_STREAM_INDEX);
assert_eq!(pkt.time_base, RTMP_TIME_BASE);
assert_eq!(pkt.pts, Some(0));
assert_eq!(pkt.dts, Some(0));
assert!(pkt.flags.header);
assert_eq!(pkt.data, vec![0x00, 0x12, 0x10]);
}
#[test]
fn audio_aac_raw_packet_keeps_packet_type_byte() {
let tag = AudioTag {
mod_ex: Vec::new(),
sound_format: AUDIO_FORMAT_AAC,
sound_rate: 3,
sound_size_16bit: true,
stereo: true,
aac_packet_type: Some(AAC_PACKET_TYPE_RAW),
body: vec![0xAB, 0xCD, 0xEF],
ex_packet_type: None,
audio_fourcc: None,
multitrack: None,
};
let pkt = audio_to_packet(123, &tag);
assert_eq!(pkt.pts, Some(123));
assert_eq!(pkt.dts, Some(123));
assert!(!pkt.flags.header);
assert_eq!(pkt.data, vec![0x01, 0xAB, 0xCD, 0xEF]);
}
#[test]
fn audio_mp3_packet_strips_flv_header_only() {
let tag = AudioTag {
mod_ex: Vec::new(),
sound_format: AUDIO_FORMAT_MP3,
sound_rate: 3,
sound_size_16bit: true,
stereo: true,
aac_packet_type: None,
body: vec![0xFF, 0xFB, 0x90, 0x00],
ex_packet_type: None,
audio_fourcc: None,
multitrack: None,
};
let pkt = audio_to_packet(40, &tag);
assert_eq!(pkt.data, vec![0xFF, 0xFB, 0x90, 0x00]);
assert_eq!(pkt.pts, Some(40));
}
#[test]
fn audio_codec_id_for_tag_dispatches_legacy_vs_fourcc() {
let legacy_aac = AudioTag {
mod_ex: Vec::new(),
sound_format: AUDIO_FORMAT_AAC,
sound_rate: 3,
sound_size_16bit: true,
stereo: true,
aac_packet_type: Some(AAC_PACKET_TYPE_SEQUENCE_HEADER),
body: vec![],
ex_packet_type: None,
audio_fourcc: None,
multitrack: None,
};
assert_eq!(audio_codec_id_for_tag(&legacy_aac).as_str(), "aac");
for (fcc, expected) in [
(flv::FOURCC_OPUS, "opus"),
(flv::FOURCC_FLAC, "flac"),
(flv::FOURCC_AC3, "ac3"),
(flv::FOURCC_EAC3, "eac3"),
(flv::FOURCC_MP3, "mp3"),
(flv::FOURCC_AAC, "aac"),
] {
let t = AudioTag {
mod_ex: Vec::new(),
sound_format: AUDIO_FORMAT_EX_HEADER,
sound_rate: 0,
sound_size_16bit: false,
stereo: false,
aac_packet_type: None,
ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
audio_fourcc: Some(fcc),
body: vec![],
multitrack: None,
};
assert_eq!(audio_codec_id_for_tag(&t).as_str(), expected);
}
}
#[test]
fn audio_fourcc_codec_id_unknown_collapses() {
assert_eq!(audio_fourcc_codec_id(*b"xxxx").as_str(), "unknown");
}
#[test]
fn ex_opus_sequence_start_packet_sets_header_flag_and_strips_fourcc() {
let tag = AudioTag {
mod_ex: Vec::new(),
sound_format: AUDIO_FORMAT_EX_HEADER,
sound_rate: 0,
sound_size_16bit: false,
stereo: false,
aac_packet_type: None,
ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START),
audio_fourcc: Some(flv::FOURCC_OPUS),
body: b"OpusHead\x01\x02\x38\x01\x80\xbb\x00\x00\x00\x00\x00".to_vec(),
multitrack: None,
};
let pkt = audio_to_packet(0, &tag);
assert_eq!(pkt.stream_index, AUDIO_STREAM_INDEX);
assert!(pkt.flags.header);
assert_eq!(
pkt.data,
b"OpusHead\x01\x02\x38\x01\x80\xbb\x00\x00\x00\x00\x00".to_vec()
);
}
#[test]
fn ex_ac3_coded_frames_packet_strips_fourcc_and_keeps_body() {
let tag = AudioTag {
mod_ex: Vec::new(),
sound_format: AUDIO_FORMAT_EX_HEADER,
sound_rate: 0,
sound_size_16bit: false,
stereo: false,
aac_packet_type: None,
ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
audio_fourcc: Some(flv::FOURCC_AC3),
body: vec![0x0B, 0x77, 0xAB, 0xCD, 0xEF],
multitrack: None,
};
let pkt = audio_to_packet(200, &tag);
assert!(!pkt.flags.header);
assert_eq!(pkt.dts, Some(200));
assert_eq!(pkt.pts, Some(200));
assert_eq!(pkt.data, vec![0x0B, 0x77, 0xAB, 0xCD, 0xEF]);
}
#[test]
fn ex_audio_sequence_end_packet_flagged_header_with_empty_body() {
let tag = AudioTag {
mod_ex: Vec::new(),
sound_format: AUDIO_FORMAT_EX_HEADER,
sound_rate: 0,
sound_size_16bit: false,
stereo: false,
aac_packet_type: None,
ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_END),
audio_fourcc: Some(flv::FOURCC_OPUS),
body: vec![],
multitrack: None,
};
let pkt = audio_to_packet(999, &tag);
assert!(pkt.flags.header);
assert!(pkt.data.is_empty());
assert_eq!(pkt.dts, Some(999));
}
#[test]
fn ex_audio_codec_params_copies_sequence_start_to_extradata() {
let tag = AudioTag {
mod_ex: Vec::new(),
sound_format: AUDIO_FORMAT_EX_HEADER,
sound_rate: 0,
sound_size_16bit: false,
stereo: false,
aac_packet_type: None,
ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START),
audio_fourcc: Some(flv::FOURCC_FLAC),
body: b"fLaC\x80\x00\x00\x22streaminfo-body-bytes".to_vec(),
multitrack: None,
};
let p = audio_codec_params(&tag);
assert_eq!(p.codec_id.as_str(), "flac");
assert_eq!(p.extradata, tag.body);
assert_eq!(p.sample_rate, None);
assert_eq!(p.channels, None);
}
#[test]
fn ex_audio_codec_params_ac3_coded_frames_leaves_extradata_empty() {
let tag = AudioTag {
mod_ex: Vec::new(),
sound_format: AUDIO_FORMAT_EX_HEADER,
sound_rate: 0,
sound_size_16bit: false,
stereo: false,
aac_packet_type: None,
ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
audio_fourcc: Some(flv::FOURCC_AC3),
body: vec![0x0B, 0x77, 0xAB, 0xCD],
multitrack: None,
};
let p = audio_codec_params(&tag);
assert_eq!(p.codec_id.as_str(), "ac3");
assert!(p.extradata.is_empty());
}
#[test]
fn video_avc_keyframe_packet_keyframe_flag_and_no_pts_offset() {
let tag = VideoTag {
mod_ex: Vec::new(),
frame_type: VIDEO_FRAME_KEYFRAME,
codec_id: VIDEO_CODEC_AVC,
avc_packet_type: Some(AVC_PACKET_TYPE_NALU),
composition_time: 0,
body: b"\x00\x00\x00\x05hello".to_vec(),
ex_packet_type: None,
fourcc: None,
multitrack: None,
};
let pkt = video_to_packet(33, &tag);
assert_eq!(pkt.stream_index, VIDEO_STREAM_INDEX);
assert!(pkt.flags.keyframe);
assert!(!pkt.flags.header);
assert_eq!(pkt.pts, Some(33));
assert_eq!(pkt.dts, Some(33));
assert_eq!(pkt.data, b"\x00\x00\x00\x05hello".to_vec());
}
#[test]
fn video_avc_inter_packet_with_negative_cts_offsets_pts() {
let tag = VideoTag {
mod_ex: Vec::new(),
frame_type: VIDEO_FRAME_INTER,
codec_id: VIDEO_CODEC_AVC,
avc_packet_type: Some(AVC_PACKET_TYPE_NALU),
composition_time: -10,
body: vec![1, 2, 3],
ex_packet_type: None,
fourcc: None,
multitrack: None,
};
let pkt = video_to_packet(100, &tag);
assert!(!pkt.flags.keyframe);
assert_eq!(pkt.dts, Some(100));
assert_eq!(pkt.pts, Some(90));
}
#[test]
fn video_avc_seq_header_marks_header_flag() {
let tag = VideoTag {
mod_ex: Vec::new(),
frame_type: VIDEO_FRAME_KEYFRAME,
codec_id: VIDEO_CODEC_AVC,
avc_packet_type: Some(AVC_PACKET_TYPE_SEQUENCE_HEADER),
composition_time: 0,
body: b"\x01\x42\x80\x1e".to_vec(),
ex_packet_type: None,
fourcc: None,
multitrack: None,
};
let pkt = video_to_packet(0, &tag);
assert!(pkt.flags.keyframe);
assert!(pkt.flags.header);
assert_eq!(pkt.data, b"\x01\x42\x80\x1e".to_vec());
}
#[test]
fn video_h263_packet_keeps_body_and_pts_eq_dts() {
let tag = VideoTag {
mod_ex: Vec::new(),
frame_type: VIDEO_FRAME_INTER,
codec_id: VIDEO_CODEC_H263,
avc_packet_type: None,
composition_time: 0,
body: vec![0xAA, 0xBB, 0xCC],
ex_packet_type: None,
fourcc: None,
multitrack: None,
};
let pkt = video_to_packet(50, &tag);
assert_eq!(pkt.pts, pkt.dts);
assert_eq!(pkt.data, vec![0xAA, 0xBB, 0xCC]);
}
#[test]
fn video_codec_id_for_tag_dispatches_legacy_vs_fourcc() {
let avc = VideoTag {
mod_ex: Vec::new(),
frame_type: VIDEO_FRAME_KEYFRAME,
codec_id: VIDEO_CODEC_AVC,
avc_packet_type: Some(AVC_PACKET_TYPE_NALU),
composition_time: 0,
body: vec![],
ex_packet_type: None,
fourcc: None,
multitrack: None,
};
assert_eq!(video_codec_id_for_tag(&avc).as_str(), "h264");
for (fcc, expected) in [
(FOURCC_HEVC, "hevc"),
(FOURCC_AV1, "av1"),
(FOURCC_VP9, "vp9"),
] {
let t = VideoTag {
mod_ex: Vec::new(),
frame_type: VIDEO_FRAME_KEYFRAME,
codec_id: 0,
avc_packet_type: None,
composition_time: 0,
body: vec![],
ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
fourcc: Some(fcc),
multitrack: None,
};
assert_eq!(video_codec_id_for_tag(&t).as_str(), expected);
}
}
#[test]
fn ex_hevc_sequence_start_packet_sets_header_flag() {
let tag = VideoTag {
mod_ex: Vec::new(),
frame_type: VIDEO_FRAME_KEYFRAME,
codec_id: 0,
avc_packet_type: None,
composition_time: 0,
body: b"\x01hvcc-stub".to_vec(),
ex_packet_type: Some(EX_PACKET_TYPE_SEQUENCE_START),
fourcc: Some(FOURCC_HEVC),
multitrack: None,
};
let pkt = video_to_packet(0, &tag);
assert!(pkt.flags.header);
assert!(pkt.flags.keyframe);
assert_eq!(pkt.dts, Some(0));
assert_eq!(pkt.pts, Some(0));
assert_eq!(pkt.data, b"\x01hvcc-stub".to_vec());
}
#[test]
fn ex_hevc_coded_frames_with_cts_offsets_pts() {
let tag = VideoTag {
mod_ex: Vec::new(),
frame_type: VIDEO_FRAME_INTER,
codec_id: 0,
avc_packet_type: None,
composition_time: 17,
body: b"\x00\x00\x00\x04NALU".to_vec(),
ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
fourcc: Some(FOURCC_HEVC),
multitrack: None,
};
let pkt = video_to_packet(200, &tag);
assert!(!pkt.flags.keyframe);
assert!(!pkt.flags.header);
assert_eq!(pkt.dts, Some(200));
assert_eq!(pkt.pts, Some(217));
}
#[test]
fn ex_av1_coded_frames_no_cts_offset() {
let tag = VideoTag {
mod_ex: Vec::new(),
frame_type: VIDEO_FRAME_KEYFRAME,
codec_id: 0,
avc_packet_type: None,
composition_time: 0,
body: vec![0x0a, 0x0b],
ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
fourcc: Some(FOURCC_AV1),
multitrack: None,
};
let pkt = video_to_packet(500, &tag);
assert!(pkt.flags.keyframe);
assert!(!pkt.flags.header);
assert_eq!(pkt.dts, pkt.pts);
assert_eq!(pkt.dts, Some(500));
}
#[test]
fn ex_metadata_packet_ignores_frame_type_flags() {
let tag = VideoTag {
mod_ex: Vec::new(),
frame_type: VIDEO_FRAME_KEYFRAME, codec_id: 0,
avc_packet_type: None,
composition_time: 0,
body: b"amf-payload".to_vec(),
ex_packet_type: Some(EX_PACKET_TYPE_METADATA),
fourcc: Some(FOURCC_HEVC),
multitrack: None,
};
let pkt = video_to_packet(123, &tag);
assert!(!pkt.flags.keyframe);
assert!(pkt.flags.header);
assert_eq!(pkt.data, b"amf-payload".to_vec());
}
#[test]
fn legacy_avc_seq_header_constant_still_referenced() {
let _ = AVC_PACKET_TYPE_SEQUENCE_HEADER;
}
#[test]
fn listen_url_parses_host_port_app_key() {
let u = ListenUrl::parse("rtmp://127.0.0.1:1935/live/secret").expect("parse");
assert_eq!(u.bind_addr, "127.0.0.1:1935");
assert_eq!(u.expected_app, "live");
assert_eq!(u.expected_stream, "secret");
}
#[test]
fn listen_url_default_port_is_1935() {
let u = ListenUrl::parse("rtmp://0.0.0.0/live/key").expect("parse");
assert_eq!(u.bind_addr, "0.0.0.0:1935");
}
#[test]
fn listen_url_accepts_app_only_path() {
let u = ListenUrl::parse("rtmp://127.0.0.1:1935/live").expect("parse");
assert_eq!(u.expected_app, "live");
assert_eq!(u.expected_stream, "");
}
#[test]
fn listen_url_rejects_non_rtmp_scheme() {
assert!(ListenUrl::parse("http://x/y").is_err());
}
#[test]
fn listen_url_rejects_missing_path() {
assert!(ListenUrl::parse("rtmp://127.0.0.1:1935").is_err());
}
#[test]
fn flatten_metadata_keeps_scalars_and_drops_objects() {
let v = Amf0Value::Object(vec![
("width".into(), Amf0Value::Number(1280.0)),
("height".into(), Amf0Value::Number(720.0)),
("encoder".into(), Amf0Value::String("oxideav".into())),
("vhost".into(), Amf0Value::Object(vec![])),
("live".into(), Amf0Value::Boolean(true)),
]);
let mut out = Vec::new();
flatten_metadata(&v, &mut out);
assert_eq!(
out,
vec![
("width".to_string(), "1280".to_string()),
("height".to_string(), "720".to_string()),
("encoder".to_string(), "oxideav".to_string()),
("live".to_string(), "true".to_string()),
]
);
}
#[test]
fn rtmp_to_core_err_maps_unexpected_eof_to_eof() {
let core = rtmp_to_core_err(RtmpError::UnexpectedEof);
assert!(matches!(core, CoreError::Eof));
}
#[test]
fn rtmp_to_core_err_maps_protocol_violation_to_invalid_data() {
let core = rtmp_to_core_err(RtmpError::ProtocolViolation("bad chunk size".into()));
match core {
CoreError::InvalidData(s) => assert!(s.contains("bad chunk size")),
_ => panic!("expected InvalidData"),
}
}
}