#![cfg(feature = "transcode")]
use std::path::PathBuf;
use std::time::{Duration, Instant};
use bytes::Bytes;
use lvqr_fragment::{Fragment, FragmentBroadcasterRegistry, FragmentFlags, FragmentMeta, FragmentStream};
use lvqr_transcode::{RenditionSpec, SoftwareTranscoderFactory, TranscodeRunner};
const FIXTURE_REL: &str = "../lvqr-conformance/fixtures/fmp4/cmaf-h264-baseline-360p-1s.mp4";
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn software_ladder_emits_three_renditions() {
let probe_registry = FragmentBroadcasterRegistry::new();
let probe = SoftwareTranscoderFactory::new(RenditionSpec::preset_720p(), probe_registry);
if !probe.is_available() {
eprintln!(
"skipping software_ladder: required GStreamer elements missing {:?}. \
Install gstreamer + plugin set (base, good, bad, ugly, libav) and re-run.",
probe.missing_elements()
);
return;
}
drop(probe);
let registry = FragmentBroadcasterRegistry::new();
let output_registry = registry.clone();
let _handle = TranscodeRunner::new()
.with_ladder(RenditionSpec::default_ladder(), move |spec| {
SoftwareTranscoderFactory::new(spec, output_registry.clone())
})
.install(®istry);
let fixture_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(FIXTURE_REL);
let bytes = std::fs::read(&fixture_path).expect("read cmaf-h264-baseline-360p-1s.mp4");
let (init, frag_body) = split_init_and_remainder(&bytes);
assert!(!init.is_empty(), "fixture must have ftyp+moov");
assert!(!frag_body.is_empty(), "fixture must have moof+mdat");
let source_meta = FragmentMeta::new("avc1.42c01f", 12_800).with_init_segment(Bytes::copy_from_slice(init));
let source_bc = registry.get_or_create("live/demo", "0.mp4", source_meta);
source_bc.emit(Fragment::new(
"0.mp4",
0,
0,
0,
0,
0,
12_800,
FragmentFlags::KEYFRAME,
Bytes::copy_from_slice(frag_body),
));
let rendition_names = ["720p", "480p", "240p"];
let expected_broadcasts = rendition_names.map(|r| format!("live/demo/{r}"));
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let all_present = expected_broadcasts
.iter()
.all(|name| registry.get(name, "0.mp4").is_some());
if all_present {
break;
}
if Instant::now() >= deadline {
panic!(
"output broadcasts did not appear on the registry within 10s; saw keys: {:?}",
registry.keys()
);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
let mut subs: Vec<_> = expected_broadcasts
.iter()
.map(|name| {
let bc = registry
.get(name, "0.mp4")
.unwrap_or_else(|| panic!("output broadcast {name} missing"));
(name.clone(), bc.subscribe())
})
.collect();
drop(source_bc);
registry.remove("live/demo", "0.mp4");
let mut per_rendition: Vec<(String, usize, usize)> = Vec::new();
for (name, mut sub) in subs.drain(..) {
let mut count = 0usize;
let mut total_bytes = 0usize;
let drain_deadline = Instant::now() + Duration::from_secs(20);
loop {
if Instant::now() >= drain_deadline {
break;
}
match tokio::time::timeout(Duration::from_secs(8), sub.next_fragment()).await {
Ok(Some(f)) => {
count += 1;
total_bytes += f.payload.len();
}
Ok(None) => break,
Err(_) => break,
}
if count >= 32 {
break;
}
}
let has_init = registry
.get(&name, "0.mp4")
.and_then(|bc| bc.meta().init_segment.clone())
.map(|b| !b.is_empty())
.unwrap_or(false);
eprintln!("rendition {name}: fragments={count}, bytes={total_bytes}, has_init={has_init}",);
per_rendition.push((name, count, total_bytes));
}
for name in &expected_broadcasts {
registry.remove(name, "0.mp4");
}
for (name, count, bytes) in &per_rendition {
assert!(
*count >= 1,
"rendition {name}: expected >= 1 output fragment, got {count}",
);
assert!(*bytes > 0, "rendition {name}: expected > 0 bytes, got {bytes}",);
}
for (name, _count, bytes) in &per_rendition {
let target_kbps = if name.ends_with("/720p") {
2_500
} else if name.ends_with("/480p") {
1_200
} else if name.ends_with("/240p") {
400
} else {
panic!("unexpected rendition {name}");
};
let measured_kbps = (*bytes as u64 * 8 / 1_000) as u32;
let low = (target_kbps as f64 * 0.6).floor() as u32;
let high = (target_kbps as f64 * 1.4).ceil() as u32;
assert!(
(low..=high).contains(&measured_kbps),
"rendition {name}: measured {measured_kbps} kbps outside coarse \
window [{low}, {high}] (target {target_kbps})",
);
}
let bytes_for = |suffix: &str| -> usize {
per_rendition
.iter()
.find(|(n, _, _)| n.ends_with(suffix))
.map(|(_, _, b)| *b)
.unwrap_or(0)
};
let b720 = bytes_for("720p");
let b240 = bytes_for("240p");
assert!(
b720 > b240,
"720p output ({b720} bytes) must exceed 240p output ({b240} bytes); the ladder rungs may be miswired",
);
}
fn split_init_and_remainder(file: &[u8]) -> (&[u8], &[u8]) {
let mut offset = 0usize;
while offset + 8 <= file.len() {
let size_word = u32::from_be_bytes(file[offset..offset + 4].try_into().unwrap());
let box_type = &file[offset + 4..offset + 8];
let box_size = if size_word == 1 {
if offset + 16 > file.len() {
return (file, &[]);
}
u64::from_be_bytes(file[offset + 8..offset + 16].try_into().unwrap()) as usize
} else if size_word == 0 {
return (file, &[]);
} else {
size_word as usize
};
if box_type == b"moof" {
return (&file[..offset], &file[offset..]);
}
offset = offset.saturating_add(box_size);
}
(file, &[])
}