use std::net::TcpListener;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use oxideav_core::{Error as CoreError, SourceOutput, SourceRegistry};
use oxideav_rtmp::{
register, RtmpClient, AUDIO_STREAM_INDEX, RTMP_MS_TO_NS, RTMP_TIME_BASE, VIDEO_STREAM_INDEX,
};
fn pick_port() -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").expect("scratch bind");
let port = listener.local_addr().expect("local_addr").port();
drop(listener);
port
}
const APP: &str = "live";
const STREAM_KEY: &str = "secret-key";
#[test]
fn registry_open_rtmp_url_yields_packet_source_with_expected_packets() {
let port = pick_port();
let url = format!("rtmp://127.0.0.1:{port}/{APP}/{STREAM_KEY}");
let (tx, rx) = mpsc::channel::<Result<(), String>>();
let push_url = url.clone();
let publisher = thread::spawn(move || {
thread::sleep(Duration::from_millis(150));
let mut client = match RtmpClient::connect(&push_url) {
Ok(c) => c,
Err(e) => {
let _ = tx.send(Err(format!("client connect: {e}")));
return;
}
};
let push = |c: &mut RtmpClient| -> Result<(), String> {
let avc_c = b"\x01\x42\x80\x1e\x00".to_vec();
c.send_video_sequence_header(&avc_c)
.map_err(|e| format!("send avc seq: {e}"))?;
let asc = vec![0x12, 0x10];
c.send_audio_sequence_header(&asc)
.map_err(|e| format!("send aac seq: {e}"))?;
let nalu_k: Vec<u8> = (0..200).map(|i| i as u8).collect();
let nalu_p: Vec<u8> = (0u32..120).map(|i| (i.wrapping_mul(3)) as u8).collect();
let nalu_q: Vec<u8> = (0u32..80)
.map(|i| (i.wrapping_mul(7).wrapping_add(1)) as u8)
.collect();
c.send_video(0, true, &nalu_k)
.map_err(|e| format!("send video k: {e}"))?;
c.send_video(33, false, &nalu_p)
.map_err(|e| format!("send video p: {e}"))?;
c.send_video(66, false, &nalu_q)
.map_err(|e| format!("send video q: {e}"))?;
let aac_a: Vec<u8> = (0..180)
.map(|i: u32| ((i.wrapping_add(5)).wrapping_mul(11)) as u8)
.collect();
let aac_b: Vec<u8> = (0..180)
.map(|i: u32| ((i.wrapping_add(40)).wrapping_mul(13)) as u8)
.collect();
c.send_audio(20, &aac_a)
.map_err(|e| format!("send audio a: {e}"))?;
c.send_audio(43, &aac_b)
.map_err(|e| format!("send audio b: {e}"))?;
Ok(())
};
let push_result = push(&mut client);
thread::sleep(Duration::from_millis(50));
let result =
push_result.and_then(|()| client.close().map_err(|e| format!("client close: {e}")));
let _ = tx.send(result);
});
let mut reg = SourceRegistry::new();
register(&mut reg);
let mut source = match reg.open(&url).expect("open rtmp source") {
SourceOutput::Packets(p) => p,
other => panic!(
"expected SourceOutput::Packets, got {:?}",
std::mem::discriminant(&other)
),
};
let streams = source.streams().to_vec();
assert_eq!(streams.len(), 2, "expected 2 streams (audio+video)");
assert_eq!(streams[0].index, AUDIO_STREAM_INDEX);
assert_eq!(streams[0].time_base, RTMP_TIME_BASE);
assert_eq!(streams[0].params.codec_id.as_str(), "aac");
assert_eq!(streams[1].index, VIDEO_STREAM_INDEX);
assert_eq!(streams[1].time_base, RTMP_TIME_BASE);
assert_eq!(streams[1].params.codec_id.as_str(), "h264");
assert_eq!(streams[0].params.extradata, vec![0x12, 0x10]);
assert_eq!(
streams[1].params.extradata,
b"\x01\x42\x80\x1e\x00".to_vec()
);
let mut packets = Vec::new();
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
if std::time::Instant::now() >= deadline {
panic!(
"deadline before EOF; got {} packets so far: {:?}",
packets.len(),
packets
.iter()
.map(|p: &oxideav_core::Packet| (p.stream_index, p.pts))
.collect::<Vec<_>>()
);
}
match source.next_packet() {
Ok(p) => packets.push(p),
Err(CoreError::Eof) => break,
Err(e) => panic!("unexpected error from next_packet(): {e}"),
}
}
drop(source);
publisher.join().expect("publisher thread joined");
rx.recv_timeout(Duration::from_secs(1))
.expect("publisher result delivered")
.expect("publisher succeeded");
let audio: Vec<_> = packets
.iter()
.filter(|p| p.stream_index == AUDIO_STREAM_INDEX)
.collect();
let video: Vec<_> = packets
.iter()
.filter(|p| p.stream_index == VIDEO_STREAM_INDEX)
.collect();
assert_eq!(
audio.len(),
3,
"expected 3 audio packets, got {}",
audio.len()
);
assert_eq!(
video.len(),
4,
"expected 4 video packets, got {}",
video.len()
);
assert_eq!(audio[0].pts, Some(0));
assert_eq!(audio[0].dts, Some(0));
assert!(audio[0].flags.header);
assert_eq!(audio[0].data, vec![0x00, 0x12, 0x10]);
assert_eq!(audio[1].pts, Some(20 * RTMP_MS_TO_NS));
assert!(!audio[1].flags.header);
assert_eq!(audio[1].data[0], 0x01); assert_eq!(audio[2].pts, Some(43 * RTMP_MS_TO_NS));
assert_eq!(video[0].pts, Some(0));
assert_eq!(video[0].dts, Some(0));
assert!(video[0].flags.keyframe);
assert!(video[0].flags.header);
assert_eq!(video[0].data, b"\x01\x42\x80\x1e\x00".to_vec());
assert_eq!(video[1].pts, Some(0));
assert!(video[1].flags.keyframe);
assert!(!video[1].flags.header);
let nalu_k: Vec<u8> = (0..200).map(|i| i as u8).collect();
assert_eq!(video[1].data, nalu_k);
assert_eq!(video[2].pts, Some(33 * RTMP_MS_TO_NS));
assert!(!video[2].flags.keyframe);
let nalu_p: Vec<u8> = (0u32..120).map(|i| (i.wrapping_mul(3)) as u8).collect();
assert_eq!(video[2].data, nalu_p);
assert_eq!(video[3].pts, Some(66 * RTMP_MS_TO_NS));
let nalu_q: Vec<u8> = (0u32..80)
.map(|i| (i.wrapping_mul(7).wrapping_add(1)) as u8)
.collect();
assert_eq!(video[3].data, nalu_q);
}
#[test]
fn registry_open_rtmp_rejects_mismatched_stream_key() {
let port = pick_port();
let url = format!("rtmp://127.0.0.1:{port}/{APP}/{STREAM_KEY}");
let bad_url = format!("rtmp://127.0.0.1:{port}/{APP}/wrong-key");
let publisher = thread::spawn(move || {
thread::sleep(Duration::from_millis(150));
let _ = RtmpClient::connect(&bad_url);
});
let mut reg = SourceRegistry::new();
register(&mut reg);
let result = reg.open(&url);
match result {
Ok(_) => panic!("expected open() to fail on stream-name mismatch"),
Err(CoreError::InvalidData(msg)) => {
assert!(
msg.contains("stream-name"),
"error should mention stream-name: got {msg:?}"
);
}
Err(e) => panic!("expected InvalidData, got {e}"),
}
publisher.join().expect("publisher thread joined");
}