use crate::{
MultiPassConfig, MultiPassEncoder, MultiPassMode, NormalizationConfig, ProgressTracker,
QualityConfig, Result, TranscodeError, TranscodeOutput,
};
use oximedia_container::{
demux::{Demuxer, FlacDemuxer, MatroskaDemuxer, OggDemuxer, WavDemuxer},
mux::{MatroskaMuxer, MuxerConfig, OggMuxer},
probe_format, ContainerFormat, Muxer, StreamInfo,
};
use oximedia_io::FileSource;
use oximedia_metering::{LoudnessMeter, MeterConfig, Standard};
use std::path::PathBuf;
use std::time::Instant;
use tracing::{debug, info, warn};
const PROBE_BYTES: usize = 16 * 1024;
const DEFAULT_SAMPLE_RATE: f64 = 48_000.0;
const DEFAULT_CHANNELS: usize = 2;
#[derive(Debug, Clone, Default)]
struct PassStats {
bytes_in: u64,
bytes_out: u64,
video_frames: u64,
audio_frames: u64,
}
async fn detect_format(path: &std::path::Path) -> Result<ContainerFormat> {
let mut source = FileSource::open(path)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
use oximedia_io::MediaSource;
let mut buf = vec![0u8; PROBE_BYTES];
let n = source
.read(&mut buf)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
buf.truncate(n);
let probe = probe_format(&buf).map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
Ok(probe.format)
}
fn output_format_from_path(path: &std::path::Path) -> ContainerFormat {
match path
.extension()
.and_then(|e| e.to_str())
.map(str::to_lowercase)
.as_deref()
{
Some("mkv") | Some("webm") => ContainerFormat::Matroska,
Some("ogg") | Some("oga") | Some("opus") => ContainerFormat::Ogg,
Some("flac") => ContainerFormat::Flac,
Some("wav") => ContainerFormat::Wav,
_ => ContainerFormat::Matroska,
}
}
#[derive(Debug, Clone)]
pub enum PipelineStage {
Validation,
AudioAnalysis,
FirstPass,
SecondPass,
ThirdPass,
Encode,
Verification,
}
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub input: PathBuf,
pub output: PathBuf,
pub video_codec: Option<String>,
pub audio_codec: Option<String>,
pub quality: Option<QualityConfig>,
pub multipass: Option<MultiPassConfig>,
pub normalization: Option<NormalizationConfig>,
pub track_progress: bool,
pub hw_accel: bool,
}
pub struct Pipeline {
config: PipelineConfig,
current_stage: PipelineStage,
progress_tracker: Option<ProgressTracker>,
normalization_gain_db: f64,
encode_start: Option<Instant>,
accumulated_stats: PassStats,
}
impl Pipeline {
#[must_use]
pub fn new(config: PipelineConfig) -> Self {
Self {
config,
current_stage: PipelineStage::Validation,
progress_tracker: None,
normalization_gain_db: 0.0,
encode_start: None,
accumulated_stats: PassStats::default(),
}
}
pub fn set_progress_tracker(&mut self, tracker: ProgressTracker) {
self.progress_tracker = Some(tracker);
}
pub async fn execute(&mut self) -> Result<TranscodeOutput> {
self.current_stage = PipelineStage::Validation;
self.validate()?;
if self.config.normalization.is_some() {
self.current_stage = PipelineStage::AudioAnalysis;
self.analyze_audio().await?;
}
self.encode_start = Some(Instant::now());
if let Some(multipass_config) = &self.config.multipass {
let mut encoder = MultiPassEncoder::new(multipass_config.clone());
while encoder.has_more_passes() {
let pass = encoder.current_pass();
self.current_stage = match pass {
1 => PipelineStage::FirstPass,
2 => PipelineStage::SecondPass,
_ => PipelineStage::ThirdPass,
};
self.execute_pass(pass, &encoder).await?;
encoder.next_pass();
}
encoder.cleanup()?;
} else {
self.current_stage = PipelineStage::Encode;
let stats = self.execute_single_pass().await?;
self.accumulated_stats.bytes_in += stats.bytes_in;
self.accumulated_stats.bytes_out += stats.bytes_out;
self.accumulated_stats.video_frames += stats.video_frames;
self.accumulated_stats.audio_frames += stats.audio_frames;
}
self.current_stage = PipelineStage::Verification;
self.verify_output().await
}
#[must_use]
pub fn current_stage(&self) -> &PipelineStage {
&self.current_stage
}
fn validate(&self) -> Result<()> {
use crate::validation::{InputValidator, OutputValidator};
InputValidator::validate_path(
self.config
.input
.to_str()
.ok_or_else(|| TranscodeError::InvalidInput("Invalid input path".to_string()))?,
)?;
OutputValidator::validate_path(
self.config
.output
.to_str()
.ok_or_else(|| TranscodeError::InvalidOutput("Invalid output path".to_string()))?,
true,
)?;
Ok(())
}
async fn analyze_audio(&mut self) -> Result<()> {
let norm_config = match &self.config.normalization {
Some(c) => c.clone(),
None => return Ok(()),
};
info!(
"Analysing audio loudness for normalization (target: {} LUFS)",
norm_config.standard.target_lufs()
);
let format = detect_format(&self.config.input).await?;
let sample_rate = DEFAULT_SAMPLE_RATE;
let channels = DEFAULT_CHANNELS;
let meter_config = MeterConfig::minimal(Standard::EbuR128, sample_rate, channels);
let mut meter = LoudnessMeter::new(meter_config)
.map_err(|e| TranscodeError::NormalizationError(e.to_string()))?;
match format {
ContainerFormat::Matroska => {
let source = FileSource::open(&self.config.input)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = MatroskaDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
}
ContainerFormat::Ogg => {
let source = FileSource::open(&self.config.input)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = OggDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
}
ContainerFormat::Wav => {
let source = FileSource::open(&self.config.input)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = WavDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
}
ContainerFormat::Flac => {
let source = FileSource::open(&self.config.input)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = FlacDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
}
other => {
warn!(
"Audio analysis: unsupported format {:?} — skipping loudness scan",
other
);
return Ok(());
}
}
let metrics = meter.metrics();
let measured_lufs = metrics.integrated_lufs;
let measured_peak = metrics.true_peak_dbtp;
let target_lufs = norm_config.standard.target_lufs();
let max_peak = norm_config.standard.max_true_peak_dbtp();
let loudness_gain = target_lufs - measured_lufs;
let peak_headroom = max_peak - measured_peak;
self.normalization_gain_db = loudness_gain.min(peak_headroom);
info!(
"Audio analysis complete: measured {:.1} LUFS / {:.1} dBTP, \
required gain {:.2} dB",
measured_lufs, measured_peak, self.normalization_gain_db
);
Ok(())
}
async fn execute_pass(&mut self, pass: u32, _encoder: &MultiPassEncoder) -> Result<()> {
info!("Starting encode pass {}", pass);
if pass == 1 {
self.demux_and_count().await?;
} else {
let stats = self.execute_single_pass().await?;
self.accumulated_stats.bytes_in += stats.bytes_in;
self.accumulated_stats.bytes_out += stats.bytes_out;
self.accumulated_stats.video_frames += stats.video_frames;
self.accumulated_stats.audio_frames += stats.audio_frames;
}
Ok(())
}
async fn demux_and_count(&self) -> Result<u64> {
let format = detect_format(&self.config.input).await?;
let count = match format {
ContainerFormat::Matroska => {
let source = FileSource::open(&self.config.input)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = MatroskaDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
count_packets(&mut demuxer).await
}
ContainerFormat::Ogg => {
let source = FileSource::open(&self.config.input)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = OggDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
count_packets(&mut demuxer).await
}
ContainerFormat::Wav => {
let source = FileSource::open(&self.config.input)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = WavDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
count_packets(&mut demuxer).await
}
ContainerFormat::Flac => {
let source = FileSource::open(&self.config.input)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = FlacDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
count_packets(&mut demuxer).await
}
other => {
debug!("demux_and_count: unsupported format {:?}", other);
0
}
};
info!("Analysis pass: counted {} packets in input", count);
Ok(count)
}
async fn execute_single_pass(&self) -> Result<PassStats> {
let input_path = &self.config.input;
let output_path = &self.config.output;
info!(
"Single-pass transcode: {} → {}",
input_path.display(),
output_path.display()
);
let in_format = detect_format(input_path).await?;
let out_format = output_format_from_path(output_path);
debug!(
"Input format: {:?}, output format: {:?}",
in_format, out_format
);
if let Some(parent) = output_path.parent() {
if !parent.as_os_str().is_empty() && !parent.exists() {
#[cfg(not(target_arch = "wasm32"))]
{
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
}
#[cfg(target_arch = "wasm32")]
{
return Err(TranscodeError::IoError(
"Filesystem operations not supported on wasm32".to_string(),
));
}
}
}
let stats = match in_format {
ContainerFormat::Matroska => {
let source = FileSource::open(input_path)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = MatroskaDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
self.remux(&mut demuxer, out_format, output_path).await?
}
ContainerFormat::Ogg => {
let source = FileSource::open(input_path)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = OggDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
self.remux(&mut demuxer, out_format, output_path).await?
}
ContainerFormat::Wav => {
let source = FileSource::open(input_path)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = WavDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
self.remux(&mut demuxer, out_format, output_path).await?
}
ContainerFormat::Flac => {
let source = FileSource::open(input_path)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut demuxer = FlacDemuxer::new(source);
demuxer
.probe()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
self.remux(&mut demuxer, out_format, output_path).await?
}
other => {
return Err(TranscodeError::ContainerError(format!(
"Unsupported input container format: {:?}",
other
)));
}
};
Ok(stats)
}
async fn remux<D>(
&self,
demuxer: &mut D,
out_format: ContainerFormat,
output_path: &std::path::Path,
) -> Result<PassStats>
where
D: Demuxer,
{
let streams: Vec<StreamInfo> = demuxer.streams().to_vec();
if streams.is_empty() {
return Err(TranscodeError::ContainerError(
"Input container has no streams".to_string(),
));
}
let audio_stream_indices: Vec<usize> = streams
.iter()
.filter(|s| s.is_audio())
.map(|s| s.index)
.collect();
if let Some(ref vc) = self.config.video_codec {
debug!("Video codec override requested: {} (stream-copy path)", vc);
}
if let Some(ref ac) = self.config.audio_codec {
debug!("Audio codec override requested: {} (stream-copy path)", ac);
}
if self.normalization_gain_db.abs() > 0.01 {
info!(
"Normalization gain {:.2} dB will be applied to {} audio stream(s)",
self.normalization_gain_db,
audio_stream_indices.len()
);
}
let mux_config = MuxerConfig::new().with_writing_app("OxiMedia-Transcode");
let stats = match out_format {
ContainerFormat::Matroska => {
let sink = FileSource::create(output_path)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut muxer = MatroskaMuxer::new(sink, mux_config);
for stream in &streams {
muxer
.add_stream(stream.clone())
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
}
muxer
.write_header()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
let stats = drain_packets_with_gain(
demuxer,
&mut muxer,
&self.progress_tracker,
&audio_stream_indices,
self.normalization_gain_db,
)
.await?;
muxer
.write_trailer()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
stats
}
ContainerFormat::Ogg => {
let sink = FileSource::create(output_path)
.await
.map_err(|e| TranscodeError::IoError(e.to_string()))?;
let mut muxer = OggMuxer::new(sink, mux_config);
for stream in &streams {
muxer
.add_stream(stream.clone())
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
}
muxer
.write_header()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
let stats = drain_packets_with_gain(
demuxer,
&mut muxer,
&self.progress_tracker,
&audio_stream_indices,
self.normalization_gain_db,
)
.await?;
muxer
.write_trailer()
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
stats
}
other => {
return Err(TranscodeError::ContainerError(format!(
"Unsupported output container format: {:?}",
other
)));
}
};
Ok(stats)
}
async fn verify_output(&self) -> Result<TranscodeOutput> {
let output_path = &self.config.output;
#[cfg(not(target_arch = "wasm32"))]
let metadata = tokio::fs::metadata(output_path).await.map_err(|e| {
TranscodeError::IoError(format!(
"Output file '{}' not found or unreadable: {}",
output_path.display(),
e
))
})?;
#[cfg(target_arch = "wasm32")]
let metadata = std::fs::metadata(output_path).map_err(|e| {
TranscodeError::IoError(format!(
"Output file '{}' not found or unreadable: {}",
output_path.display(),
e
))
})?;
let file_size = metadata.len();
if file_size == 0 {
return Err(TranscodeError::PipelineError(
"Output file is empty — transcode may have failed".to_string(),
));
}
let encoding_time = match self.encode_start {
Some(t) => t.elapsed().as_secs_f64(),
None => 0.0,
};
let duration_approx = derive_duration_approx(file_size);
let speed_factor = if encoding_time > 0.0 && duration_approx > 0.0 {
duration_approx / encoding_time
} else {
1.0
};
let total_frames =
self.accumulated_stats.video_frames + self.accumulated_stats.audio_frames;
let video_bitrate_approx = if duration_approx > 0.0
&& self.accumulated_stats.video_frames > 0
&& total_frames > 0
{
let video_fraction = self.accumulated_stats.video_frames as f64 / total_frames as f64;
((self.accumulated_stats.bytes_out as f64 * video_fraction * 8.0) / duration_approx)
as u64
} else {
0u64
};
let audio_bitrate_approx = if duration_approx > 0.0
&& self.accumulated_stats.audio_frames > 0
&& total_frames > 0
{
let audio_fraction = self.accumulated_stats.audio_frames as f64 / total_frames as f64;
((self.accumulated_stats.bytes_out as f64 * audio_fraction * 8.0) / duration_approx)
as u64
} else {
0u64
};
info!(
"Transcode complete: {} video frames, {} audio frames, \
{} bytes in → {} bytes out, encoding time {:.2}s, speed {:.2}×",
self.accumulated_stats.video_frames,
self.accumulated_stats.audio_frames,
self.accumulated_stats.bytes_in,
self.accumulated_stats.bytes_out,
encoding_time,
speed_factor
);
Ok(TranscodeOutput {
output_path: output_path
.to_str()
.map(String::from)
.unwrap_or_else(|| output_path.display().to_string()),
file_size,
duration: duration_approx,
video_bitrate: video_bitrate_approx,
audio_bitrate: audio_bitrate_approx,
encoding_time,
speed_factor,
})
}
}
async fn drain_packets_with_gain<D, M>(
demuxer: &mut D,
muxer: &mut M,
_progress: &Option<ProgressTracker>,
audio_stream_indices: &[usize],
gain_db: f64,
) -> Result<PassStats>
where
D: Demuxer,
M: Muxer,
{
let mut stats = PassStats::default();
let gain_linear = 10f64.powf(gain_db / 20.0) as f32;
let apply_gain = gain_db.abs() > 0.01 && !audio_stream_indices.is_empty();
loop {
match demuxer.read_packet().await {
Ok(mut pkt) => {
if pkt.should_discard() {
continue;
}
let raw_len = pkt.data.len() as u64;
stats.bytes_in += raw_len;
if audio_stream_indices.contains(&pkt.stream_index) {
if apply_gain {
pkt.data = apply_i16_gain(pkt.data, gain_linear);
}
stats.audio_frames += 1;
} else {
stats.video_frames += 1;
}
let out_len = pkt.data.len() as u64;
stats.bytes_out += out_len;
muxer
.write_packet(&pkt)
.await
.map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
let total = stats.video_frames + stats.audio_frames;
if total % 500 == 0 {
debug!(
"Remuxed {} packets ({} video, {} audio)",
total, stats.video_frames, stats.audio_frames
);
}
}
Err(e) if e.is_eof() => break,
Err(e) => {
return Err(TranscodeError::ContainerError(format!(
"Error reading packet: {}",
e
)));
}
}
}
debug!(
"drain_packets_with_gain: {} video frames, {} audio frames, \
{} bytes in, {} bytes out",
stats.video_frames, stats.audio_frames, stats.bytes_in, stats.bytes_out
);
Ok(stats)
}
fn apply_i16_gain(data: bytes::Bytes, gain_linear: f32) -> bytes::Bytes {
if (gain_linear - 1.0).abs() < f32::EPSILON {
return data;
}
let mut buf: Vec<u8> = data.into();
let n_samples = buf.len() / 2;
for i in 0..n_samples {
let lo = buf[i * 2];
let hi = buf[i * 2 + 1];
let sample = i16::from_le_bytes([lo, hi]) as f32;
let gained = (sample * gain_linear).clamp(i16::MIN as f32, i16::MAX as f32) as i16;
let out = gained.to_le_bytes();
buf[i * 2] = out[0];
buf[i * 2 + 1] = out[1];
}
bytes::Bytes::from(buf)
}
async fn count_packets<D: Demuxer>(demuxer: &mut D) -> u64 {
let mut count: u64 = 0;
loop {
match demuxer.read_packet().await {
Ok(_) => count += 1,
Err(e) if e.is_eof() => break,
Err(_) => break,
}
}
count
}
async fn feed_audio_packets_to_meter<D: Demuxer>(demuxer: &mut D, meter: &mut LoudnessMeter) {
let audio_stream_indices: Vec<usize> = demuxer
.streams()
.iter()
.filter(|s| s.is_audio())
.map(|s| s.index)
.collect();
loop {
match demuxer.read_packet().await {
Ok(pkt) => {
if !audio_stream_indices.contains(&pkt.stream_index) {
continue;
}
let samples = bytes_as_f32_samples(&pkt.data);
if !samples.is_empty() {
meter.process_f32(&samples);
}
}
Err(e) if e.is_eof() => break,
Err(_) => break,
}
}
}
fn bytes_as_f32_samples(data: &[u8]) -> Vec<f32> {
let n_samples = data.len() / 4;
let mut out = Vec::with_capacity(n_samples);
for i in 0..n_samples {
let base = i * 4;
let raw = u32::from_le_bytes([data[base], data[base + 1], data[base + 2], data[base + 3]]);
out.push(f32::from_bits(raw));
}
out
}
fn derive_duration_approx(file_size: u64) -> f64 {
const BYTES_PER_SECOND: f64 = 625_000.0;
file_size as f64 / BYTES_PER_SECOND
}
pub struct TranscodePipeline {
config: PipelineConfig,
}
impl TranscodePipeline {
#[must_use]
pub fn builder() -> TranscodePipelineBuilder {
TranscodePipelineBuilder::new()
}
pub fn set_video_codec(&mut self, codec: &str) {
self.config.video_codec = Some(codec.to_string());
}
pub fn set_audio_codec(&mut self, codec: &str) {
self.config.audio_codec = Some(codec.to_string());
}
pub async fn execute(&mut self) -> crate::Result<TranscodeOutput> {
let mut pipeline = Pipeline::new(self.config.clone());
pipeline.execute().await
}
}
pub struct TranscodePipelineBuilder {
input: Option<PathBuf>,
output: Option<PathBuf>,
video_codec: Option<String>,
audio_codec: Option<String>,
quality: Option<QualityConfig>,
multipass: Option<MultiPassMode>,
normalization: Option<NormalizationConfig>,
track_progress: bool,
hw_accel: bool,
}
impl TranscodePipelineBuilder {
#[must_use]
pub fn new() -> Self {
Self {
input: None,
output: None,
video_codec: None,
audio_codec: None,
quality: None,
multipass: None,
normalization: None,
track_progress: false,
hw_accel: true,
}
}
#[must_use]
pub fn input(mut self, path: impl Into<PathBuf>) -> Self {
self.input = Some(path.into());
self
}
#[must_use]
pub fn output(mut self, path: impl Into<PathBuf>) -> Self {
self.output = Some(path.into());
self
}
#[must_use]
pub fn video_codec(mut self, codec: impl Into<String>) -> Self {
self.video_codec = Some(codec.into());
self
}
#[must_use]
pub fn audio_codec(mut self, codec: impl Into<String>) -> Self {
self.audio_codec = Some(codec.into());
self
}
#[must_use]
pub fn quality(mut self, quality: QualityConfig) -> Self {
self.quality = Some(quality);
self
}
#[must_use]
pub fn multipass(mut self, mode: MultiPassMode) -> Self {
self.multipass = Some(mode);
self
}
#[must_use]
pub fn normalization(mut self, config: NormalizationConfig) -> Self {
self.normalization = Some(config);
self
}
#[must_use]
pub fn track_progress(mut self, enable: bool) -> Self {
self.track_progress = enable;
self
}
#[must_use]
pub fn hw_accel(mut self, enable: bool) -> Self {
self.hw_accel = enable;
self
}
pub fn build(self) -> crate::Result<TranscodePipeline> {
let input = self
.input
.ok_or_else(|| TranscodeError::InvalidInput("Input path not specified".to_string()))?;
let output = self.output.ok_or_else(|| {
TranscodeError::InvalidOutput("Output path not specified".to_string())
})?;
let multipass_config = self
.multipass
.map(|mode| MultiPassConfig::new(mode, "/tmp/transcode_stats.log"));
Ok(TranscodePipeline {
config: PipelineConfig {
input,
output,
video_codec: self.video_codec,
audio_codec: self.audio_codec,
quality: self.quality,
multipass: multipass_config,
normalization: self.normalization,
track_progress: self.track_progress,
hw_accel: self.hw_accel,
},
})
}
}
impl Default for TranscodePipelineBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_builder() {
let result = TranscodePipelineBuilder::new()
.input("/tmp/input.mkv")
.output("/tmp/output.mkv")
.video_codec("vp9")
.audio_codec("opus")
.track_progress(true)
.hw_accel(false)
.build();
assert!(result.is_ok());
let pipeline = result.expect("should succeed in test");
assert_eq!(pipeline.config.input, PathBuf::from("/tmp/input.mkv"));
assert_eq!(pipeline.config.output, PathBuf::from("/tmp/output.mkv"));
assert_eq!(pipeline.config.video_codec, Some("vp9".to_string()));
assert_eq!(pipeline.config.audio_codec, Some("opus".to_string()));
assert!(pipeline.config.track_progress);
assert!(!pipeline.config.hw_accel);
}
#[test]
fn test_pipeline_builder_missing_input() {
let result = TranscodePipelineBuilder::new()
.output("/tmp/output.mkv")
.build();
assert!(result.is_err());
}
#[test]
fn test_pipeline_builder_missing_output() {
let result = TranscodePipelineBuilder::new()
.input("/tmp/input.mkv")
.build();
assert!(result.is_err());
}
#[test]
fn test_pipeline_stage_flow() {
let config = PipelineConfig {
input: PathBuf::from("/tmp/input.mkv"),
output: PathBuf::from("/tmp/output.mkv"),
video_codec: None,
audio_codec: None,
quality: None,
multipass: None,
normalization: None,
track_progress: false,
hw_accel: true,
};
let pipeline = Pipeline::new(config);
assert!(matches!(
pipeline.current_stage(),
PipelineStage::Validation
));
}
#[test]
fn test_output_format_from_path() {
assert!(matches!(
output_format_from_path(std::path::Path::new("out.mkv")),
ContainerFormat::Matroska
));
assert!(matches!(
output_format_from_path(std::path::Path::new("out.webm")),
ContainerFormat::Matroska
));
assert!(matches!(
output_format_from_path(std::path::Path::new("out.ogg")),
ContainerFormat::Ogg
));
}
#[test]
fn test_bytes_as_f32_samples_empty() {
let samples = bytes_as_f32_samples(&[]);
assert!(samples.is_empty());
}
#[test]
fn test_bytes_as_f32_samples_partial() {
let data = [0u8; 7];
let samples = bytes_as_f32_samples(&data);
assert_eq!(samples.len(), 1);
}
#[test]
fn test_bytes_as_f32_known_value() {
let data = [0x00u8, 0x00, 0x00, 0x00];
let samples = bytes_as_f32_samples(&data);
assert_eq!(samples.len(), 1);
assert_eq!(samples[0], 0.0f32);
}
#[test]
fn test_apply_i16_gain_unity() {
let sample: i16 = 1234;
let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
let out = apply_i16_gain(raw.clone(), 1.0);
assert_eq!(&out[..], &raw[..]);
}
#[test]
fn test_apply_i16_gain_double() {
let sample: i16 = 1000;
let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
let out = apply_i16_gain(raw, 2.0);
let result = i16::from_le_bytes([out[0], out[1]]);
assert_eq!(result, 2000);
}
#[test]
fn test_apply_i16_gain_clamp_positive() {
let sample: i16 = i16::MAX;
let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
let out = apply_i16_gain(raw, 2.0);
let result = i16::from_le_bytes([out[0], out[1]]);
assert_eq!(result, i16::MAX);
}
#[test]
fn test_apply_i16_gain_clamp_negative() {
let sample: i16 = i16::MIN;
let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
let out = apply_i16_gain(raw, 2.0);
let result = i16::from_le_bytes([out[0], out[1]]);
assert_eq!(result, i16::MIN);
}
#[test]
fn test_apply_i16_gain_half() {
let sample: i16 = 2000;
let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
let out = apply_i16_gain(raw, 0.5);
let result = i16::from_le_bytes([out[0], out[1]]);
assert_eq!(result, 1000);
}
#[test]
fn test_apply_i16_gain_odd_byte_length() {
let raw = bytes::Bytes::from(vec![0xFFu8, 0x7F, 0xAB]); let out = apply_i16_gain(raw, 2.0);
let result = i16::from_le_bytes([out[0], out[1]]);
assert_eq!(result, i16::MAX);
assert_eq!(out[2], 0xAB);
}
#[test]
fn test_pass_stats_default() {
let stats = PassStats::default();
assert_eq!(stats.bytes_in, 0);
assert_eq!(stats.bytes_out, 0);
assert_eq!(stats.video_frames, 0);
assert_eq!(stats.audio_frames, 0);
}
#[tokio::test]
async fn test_pipeline_execute_remux_produces_output() {
use oximedia_container::{
mux::{MatroskaMuxer, MuxerConfig},
Muxer, Packet, PacketFlags, StreamInfo,
};
use oximedia_core::{CodecId, Rational, Timestamp};
use oximedia_io::MemorySource;
let in_buf = MemorySource::new_writable(64 * 1024);
let mut muxer = MatroskaMuxer::new(in_buf, MuxerConfig::new());
let mut video = StreamInfo::new(0, CodecId::Vp9, Rational::new(1, 1000));
video.codec_params.width = Some(320);
video.codec_params.height = Some(240);
muxer.add_stream(video).expect("add stream");
muxer.write_header().await.expect("write header");
for i in 0u64..30 {
let data = vec![0x42u8, 0x00, (i & 0xFF) as u8, 0x01];
let pkt = Packet::new(
0,
bytes::Bytes::from(data),
Timestamp::new(i as i64 * 33, Rational::new(1, 1000)),
PacketFlags::KEYFRAME,
);
muxer.write_packet(&pkt).await.expect("write packet");
}
muxer.write_trailer().await.expect("write trailer");
let tmp_dir = std::env::temp_dir();
let input_path = tmp_dir.join("pipeline_test_input.mkv");
let output_path = tmp_dir.join("pipeline_test_output.mkv");
let sink = muxer.into_sink();
let mkv_bytes = sink.written_data().to_vec();
tokio::fs::write(&input_path, &mkv_bytes)
.await
.expect("write temp input");
let mut pipeline = TranscodePipelineBuilder::new()
.input(input_path.clone())
.output(output_path.clone())
.build()
.expect("build pipeline");
let result = pipeline.execute().await;
let _ = tokio::fs::remove_file(&input_path).await;
let _ = tokio::fs::remove_file(&output_path).await;
let output = result.expect("pipeline execute should succeed");
assert!(
output.file_size > 0,
"output file size must be > 0, got {}",
output.file_size
);
assert!(
output.encoding_time >= 0.0,
"encoding time must be non-negative"
);
}
#[tokio::test]
async fn test_pipeline_execute_with_normalization_gain() {
use crate::{LoudnessStandard, NormalizationConfig};
use oximedia_container::{
mux::{MatroskaMuxer, MuxerConfig},
Muxer, Packet, PacketFlags, StreamInfo,
};
use oximedia_core::{CodecId, Rational, Timestamp};
use oximedia_io::MemorySource;
let in_buf = MemorySource::new_writable(64 * 1024);
let mut muxer = MatroskaMuxer::new(in_buf, MuxerConfig::new());
let mut audio = StreamInfo::new(0, CodecId::Opus, Rational::new(1, 48000));
audio.codec_params.sample_rate = Some(48000);
audio.codec_params.channels = Some(2);
muxer.add_stream(audio).expect("add audio stream");
muxer.write_header().await.expect("write header");
for i in 0u64..20 {
let sample_le: i16 = 100;
let mut data = Vec::with_capacity(32);
for _ in 0..16 {
data.extend_from_slice(&sample_le.to_le_bytes());
}
let pkt = Packet::new(
0,
bytes::Bytes::from(data),
Timestamp::new(i as i64 * 960, Rational::new(1, 48000)),
PacketFlags::KEYFRAME,
);
muxer.write_packet(&pkt).await.expect("write audio packet");
}
muxer.write_trailer().await.expect("write trailer");
let tmp_dir = std::env::temp_dir();
let input_path = tmp_dir.join("pipeline_norm_input.mkv");
let output_path = tmp_dir.join("pipeline_norm_output.mkv");
let sink = muxer.into_sink();
let mkv_bytes = sink.written_data().to_vec();
tokio::fs::write(&input_path, &mkv_bytes)
.await
.expect("write temp input");
let norm_config = NormalizationConfig::new(LoudnessStandard::EbuR128);
let pipeline_config = PipelineConfig {
input: input_path.clone(),
output: output_path.clone(),
video_codec: None,
audio_codec: None,
quality: None,
multipass: None,
normalization: Some(norm_config),
track_progress: false,
hw_accel: false,
};
let mut pipeline_inner = Pipeline::new(pipeline_config);
pipeline_inner.normalization_gain_db = 6.0206;
pipeline_inner.encode_start = Some(std::time::Instant::now());
pipeline_inner.current_stage = PipelineStage::Encode;
let pass_stats = pipeline_inner.execute_single_pass().await;
let _ = tokio::fs::remove_file(&input_path).await;
let _ = tokio::fs::remove_file(&output_path).await;
let stats = pass_stats.expect("single-pass should succeed");
assert!(
stats.audio_frames > 0,
"must have processed at least one audio frame"
);
assert!(
stats.bytes_in > 0,
"must have read at least some bytes from input"
);
assert!(
stats.bytes_out > 0,
"must have written at least some bytes to output"
);
}
}