use lvqr_test_utils::flv::{flv_video_nalu, flv_video_seq_header};
use lvqr_test_utils::http::{HttpGetOptions, HttpResponse, http_get_with};
use lvqr_test_utils::rtmp::{read_until, rtmp_client_handshake, send_result, send_results};
use lvqr_test_utils::{TestServer, TestServerConfig};
use rml_rtmp::sessions::{ClientSession, ClientSessionConfig, ClientSessionEvent, PublishRequestType};
use rml_rtmp::time::RtmpTimestamp;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpStream;
const TIMEOUT: Duration = Duration::from_secs(10);
async fn connect_and_publish(addr: SocketAddr, app: &str, stream_key: &str) -> (TcpStream, ClientSession) {
let mut stream = tokio::time::timeout(TIMEOUT, TcpStream::connect(addr))
.await
.unwrap()
.unwrap();
stream.set_nodelay(true).unwrap();
let remaining = rtmp_client_handshake(&mut stream).await;
let config = ClientSessionConfig::new();
let (mut session, initial_results) = ClientSession::new(config).unwrap();
send_results(&mut stream, &initial_results).await;
if !remaining.is_empty() {
let results = session.handle_input(&remaining).unwrap();
send_results(&mut stream, &results).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
let connect_result = session.request_connection(app.to_string()).unwrap();
send_result(&mut stream, &connect_result).await;
read_until(&mut stream, &mut session, TIMEOUT, |e| {
matches!(e, ClientSessionEvent::ConnectionRequestAccepted)
})
.await;
let publish_result = session
.request_publishing(stream_key.to_string(), PublishRequestType::Live)
.unwrap();
send_result(&mut stream, &publish_result).await;
read_until(&mut stream, &mut session, TIMEOUT, |e| {
matches!(e, ClientSessionEvent::PublishRequestAccepted)
})
.await;
(stream, session)
}
async fn http_get(addr: SocketAddr, path: &str) -> HttpResponse {
http_get_with(
addr,
path,
HttpGetOptions {
timeout: TIMEOUT,
..Default::default()
},
)
.await
}
async fn publish_two_keyframes(addr: SocketAddr, app: &str, key: &str) -> (TcpStream, ClientSession) {
let (mut rtmp_stream, mut session) = connect_and_publish(addr, app, key).await;
let seq = flv_video_seq_header();
let result = session.publish_video_data(seq, RtmpTimestamp::new(0), false).unwrap();
send_result(&mut rtmp_stream, &result).await;
let nalu = vec![0x00, 0x00, 0x00, 0x04, 0x65, 0x88, 0x84, 0x00];
let kf0 = flv_video_nalu(true, 0, &nalu);
let result = session.publish_video_data(kf0, RtmpTimestamp::new(0), false).unwrap();
send_result(&mut rtmp_stream, &result).await;
let kf1 = flv_video_nalu(true, 0, &nalu);
let result = session
.publish_video_data(kf1, RtmpTimestamp::new(2100), false)
.unwrap();
send_result(&mut rtmp_stream, &result).await;
(rtmp_stream, session)
}
#[tokio::test]
async fn rtmp_publish_reaches_dash_router() {
let _ = tracing_subscriber::fmt()
.with_env_filter("lvqr=debug")
.with_test_writer()
.try_init();
let server = TestServer::start(TestServerConfig::default().with_dash())
.await
.expect("start TestServer with DASH");
let rtmp_addr = server.rtmp_addr();
let dash_addr = server.dash_addr();
let (_rtmp_stream, _session) = publish_two_keyframes(rtmp_addr, "live", "test").await;
tokio::time::sleep(Duration::from_millis(500)).await;
let manifest = http_get(dash_addr, "/dash/live/test/manifest.mpd").await;
assert_eq!(manifest.status, 200, "manifest GET status");
let body = std::str::from_utf8(&manifest.body).expect("manifest body utf-8");
eprintln!("--- manifest.mpd ---\n{body}\n--- end ---");
assert!(body.contains("<MPD"));
assert!(body.contains("type=\"dynamic\""));
assert!(body.contains("<AdaptationSet id=\"0\""));
assert!(body.contains("seg-video-$Number$.m4s"));
let init = http_get(dash_addr, "/dash/live/test/init-video.m4s").await;
assert_eq!(init.status, 200, "init-video GET status");
assert!(init.body.len() >= 8, "init-video body too short");
assert_eq!(&init.body[4..8], b"ftyp", "init-video segment did not start with ftyp");
let seg = http_get(dash_addr, "/dash/live/test/seg-video-1.m4s").await;
assert_eq!(seg.status, 200, "seg-video-1 GET status");
assert!(
seg.body.len() >= 32,
"seg-video-1 body too short for styp+moof: {} bytes",
seg.body.len()
);
assert_eq!(
&seg.body[4..8],
b"styp",
"expected seg-video-1 to start with a styp box"
);
assert_eq!(
&seg.body[28..32],
b"moof",
"expected seg-video-1's body after styp to start with a moof box"
);
let unknown = http_get(dash_addr, "/dash/live/ghost/manifest.mpd").await;
assert_eq!(unknown.status, 404);
drop(_rtmp_stream);
server.shutdown().await.expect("shutdown");
}
#[tokio::test]
async fn rtmp_disconnect_produces_static_dash_manifest() {
let _ = tracing_subscriber::fmt()
.with_env_filter("lvqr=debug")
.with_test_writer()
.try_init();
let server = TestServer::start(TestServerConfig::default().with_dash())
.await
.expect("start TestServer with DASH");
let rtmp_addr = server.rtmp_addr();
let dash_addr = server.dash_addr();
let (rtmp_stream, session) = publish_two_keyframes(rtmp_addr, "live", "fin").await;
tokio::time::sleep(Duration::from_millis(500)).await;
let resp = http_get(dash_addr, "/dash/live/fin/manifest.mpd").await;
assert_eq!(resp.status, 200);
let body = std::str::from_utf8(&resp.body).expect("utf-8");
assert!(
body.contains(r#"type="dynamic""#),
"before disconnect, MPD must be dynamic:\n{body}"
);
drop(rtmp_stream);
drop(session);
tokio::time::sleep(Duration::from_millis(500)).await;
let resp = http_get(dash_addr, "/dash/live/fin/manifest.mpd").await;
assert_eq!(resp.status, 200);
let body = std::str::from_utf8(&resp.body).expect("utf-8");
assert!(
body.contains(r#"type="static""#),
"after disconnect, MPD must be static:\n{body}"
);
assert!(
!body.contains("minimumUpdatePeriod="),
"after disconnect, MPD must omit minimumUpdatePeriod:\n{body}"
);
server.shutdown().await.expect("shutdown");
}