Skip to main content

oximedia_transcode/
pipeline.rs

1//! Transcoding pipeline orchestration and execution.
2//!
3//! This module implements the core packet-level transcode loop:
4//! demux → (optional codec-compat check) → mux.  Full decode/encode
5//! paths are wired in for the codecs that OxiMedia natively supports
6//! (AV1, VP8, VP9, Opus, Vorbis, FLAC).
7//!
8//! For audio-analysis (normalization), the raw packet bytes are treated
9//! as PCM-like interleaved f32 so that the LoudnessMeter can derive a
10//! coarse integrated-loudness estimate without a full codec stack.
11//!
12//! ## Pipeline Execution Architecture
13//!
14//! When [`TranscodePipeline::execute()`] is called, it delegates to the
15//! [`Pipeline`] executor which runs a four-stage flow:
16//!
17//! ```text
18//! ┌────────────┐  packets  ┌──────────┐  frames  ┌───────────┐  packets  ┌───────┐
19//! │  Demuxer   │ ────────► │  Decode  │ ────────► │  Encode   │ ────────► │  Mux  │
20//! └────────────┘           └──────────┘           └───────────┘           └───────┘
21//!       │                        │                      │
22//!       │  probe format          │  audio analysis      │  per-packet byte tracking
23//!       │  (magic bytes)         │  (EBU-R128)          │  (bytes_in / bytes_out)
24//! ```
25//!
26//! **Stage 1 — Validation.** Checks that input and output paths are
27//! accessible before any I/O is performed.
28//!
29//! **Stage 2 — AudioAnalysis.** When normalization is enabled, scans the
30//! entire input audio track to compute an EBU-R128 integrated loudness gain.
31//! This pass is skipped for non-normalized jobs.
32//!
33//! **Stage 3 — Encode.** Reads packets from the demuxer, applies optional
34//! per-codec stream-copy or re-encode, accumulates byte counts and frame
35//! counters, and applies the normalization gain to audio packets as i16 PCM
36//! in-band.  Supports single-pass and multi-pass modes.
37//!
38//! **Stage 4 — Verification.** Confirms the output file is non-empty and
39//! assembles [`TranscodeOutput`] with real byte and frame statistics.
40//!
41//! ## Stream Copy Mode
42//!
43//! When no codec conversion is needed (codec is left unset or matches the
44//! source), packets bypass re-encoding entirely and pass directly from
45//! demuxer to muxer.
46//!
47//! ## Multi-pass Support
48//!
49//! [`MultiPassMode`] controls whether a single-pass or two-pass encode is
50//! performed.  Pass 1 collects statistics; pass 2 applies them.
51//!
52//! # Pipeline execution stages
53//!
54//! 1. **Validation** – check input/output paths.
55//! 2. **AudioAnalysis** – scan input audio to compute EBU-R128 loudness gain
56//!    (only when normalization is enabled).
57//! 3. **Encode** – single-pass or multi-pass remux with:
58//!    - Per-packet byte tracking (`bytes_in`, `bytes_out`).
59//!    - Video / audio frame counting.
60//!    - Linear gain applied to every audio packet (i16 PCM in-band).
61//! 4. **Verification** – confirm the output file is non-empty and assemble
62//!    `TranscodeOutput` with real stats.
63
64use crate::{
65    make_video_encoder, MultiPassConfig, MultiPassEncoder, MultiPassMode, NormalizationConfig,
66    ProgressTracker, QualityConfig, RateControlMode, Result, TranscodeError, TranscodeOutput,
67    VideoEncoderParams,
68};
69use oximedia_container::{
70    demux::{Demuxer, FlacDemuxer, MatroskaDemuxer, OggDemuxer, WavDemuxer},
71    mux::{MatroskaMuxer, MuxerConfig, OggMuxer},
72    probe_format, ContainerFormat, Muxer, StreamInfo,
73};
74use oximedia_core::CodecId;
75use oximedia_io::FileSource;
76use oximedia_metering::{LoudnessMeter, MeterConfig, Standard};
77use std::path::PathBuf;
78use std::time::Instant;
79use tracing::{debug, info, warn};
80
81// ─── Constants ───────────────────────────────────────────────────────────────
82
83/// Probe buffer: read this many bytes to detect the container format.
84const PROBE_BYTES: usize = 16 * 1024;
85
86/// Default assumed sample rate when audio streams carry no params.
87const DEFAULT_SAMPLE_RATE: f64 = 48_000.0;
88
89/// Default assumed channel count when audio streams carry no params.
90const DEFAULT_CHANNELS: usize = 2;
91
92// ─── Intra-codec helpers ──────────────────────────────────────────────────────
93
94/// Default quality/QP used when the caller does not supply CRF for intra codecs.
95const INTRA_DEFAULT_QUALITY: u8 = 85;
96
97/// Fallback frame width when the stream header carries no resolution.
98const INTRA_FALLBACK_WIDTH: u32 = 1920;
99
100/// Fallback frame height when the stream header carries no resolution.
101const INTRA_FALLBACK_HEIGHT: u32 = 1080;
102
103/// Parse a codec name string into a [`CodecId`] if it is an intra-only codec
104/// supported by [`make_video_encoder`] (MJPEG or APV).
105///
106/// Returns `None` if the name does not map to an intra codec.
107fn parse_intra_codec(name: &str) -> Option<CodecId> {
108    match name.to_lowercase().as_str() {
109        "mjpeg" | "motion-jpeg" | "motion_jpeg" => Some(CodecId::Mjpeg),
110        "apv" => Some(CodecId::Apv),
111        _ => None,
112    }
113}
114
115/// Build a [`VideoEncoderParams`] from the first video stream found in `streams`.
116///
117/// Falls back to [`INTRA_FALLBACK_WIDTH`] × [`INTRA_FALLBACK_HEIGHT`] when the
118/// stream header does not carry resolution metadata.  `quality` comes from the
119/// pipeline's CRF setting (if present) or [`INTRA_DEFAULT_QUALITY`].
120fn intra_encoder_params(streams: &[StreamInfo], quality: u8) -> Result<VideoEncoderParams> {
121    let (w, h) = streams
122        .iter()
123        .find(|s| s.is_video())
124        .and_then(|s| {
125            let w = s.codec_params.width?;
126            let h = s.codec_params.height?;
127            Some((w, h))
128        })
129        .unwrap_or((INTRA_FALLBACK_WIDTH, INTRA_FALLBACK_HEIGHT));
130
131    VideoEncoderParams::new(w, h, quality)
132}
133
134// ─── PassStats ────────────────────────────────────────────────────────────────
135
136/// Per-pass statistics collected during packet-level remuxing.
137///
138/// Accumulated across all passes so that `verify_output` can report
139/// accurate counts even for multi-pass encodes.
140#[derive(Debug, Clone, Default)]
141struct PassStats {
142    /// Total uncompressed bytes read from the input demuxer.
143    bytes_in: u64,
144    /// Total bytes written to the output muxer (after gain adjustment).
145    bytes_out: u64,
146    /// Number of video packets forwarded.
147    video_frames: u64,
148    /// Number of audio packets forwarded.
149    audio_frames: u64,
150}
151
152// ─── Container-format helpers ─────────────────────────────────────────────────
153
154/// Detect the container format of `path` by reading a small probe buffer.
155async fn detect_format(path: &std::path::Path) -> Result<ContainerFormat> {
156    let mut source = FileSource::open(path)
157        .await
158        .map_err(|e| TranscodeError::IoError(e.to_string()))?;
159
160    use oximedia_io::MediaSource;
161    let mut buf = vec![0u8; PROBE_BYTES];
162    let n = source
163        .read(&mut buf)
164        .await
165        .map_err(|e| TranscodeError::IoError(e.to_string()))?;
166    buf.truncate(n);
167
168    let probe = probe_format(&buf).map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
169    Ok(probe.format)
170}
171
172/// Decide the output container format from the file extension.
173fn output_format_from_path(path: &std::path::Path) -> ContainerFormat {
174    match path
175        .extension()
176        .and_then(|e| e.to_str())
177        .map(str::to_lowercase)
178        .as_deref()
179    {
180        Some("mkv") | Some("webm") => ContainerFormat::Matroska,
181        Some("ogg") | Some("oga") | Some("opus") => ContainerFormat::Ogg,
182        Some("flac") => ContainerFormat::Flac,
183        Some("wav") => ContainerFormat::Wav,
184        // Fallback: if input was matroska-family keep it, else matroska
185        _ => ContainerFormat::Matroska,
186    }
187}
188
189// ─── Pipeline stage types ─────────────────────────────────────────────────────
190
191/// Pipeline stage in the transcoding workflow.
192#[derive(Debug, Clone)]
193pub enum PipelineStage {
194    /// Input validation stage.
195    Validation,
196    /// Audio analysis stage (for normalization).
197    AudioAnalysis,
198    /// First pass encoding stage (analysis).
199    FirstPass,
200    /// Second pass encoding stage (final).
201    SecondPass,
202    /// Third pass encoding stage (optional).
203    ThirdPass,
204    /// Final encoding stage.
205    Encode,
206    /// Output verification stage.
207    Verification,
208}
209
210// ─── PipelineConfig ───────────────────────────────────────────────────────────
211
212/// Transcoding pipeline configuration.
213#[derive(Debug, Clone)]
214pub struct PipelineConfig {
215    /// Input file path.
216    pub input: PathBuf,
217    /// Output file path.
218    pub output: PathBuf,
219    /// Video codec name.
220    pub video_codec: Option<String>,
221    /// Audio codec name.
222    pub audio_codec: Option<String>,
223    /// Quality configuration.
224    pub quality: Option<QualityConfig>,
225    /// Multi-pass configuration.
226    pub multipass: Option<MultiPassConfig>,
227    /// Normalization configuration.
228    pub normalization: Option<NormalizationConfig>,
229    /// Enable progress tracking.
230    pub track_progress: bool,
231    /// Enable hardware acceleration.
232    pub hw_accel: bool,
233}
234
235// ─── Pipeline ─────────────────────────────────────────────────────────────────
236
237/// Transcoding pipeline orchestrator.
238pub struct Pipeline {
239    config: PipelineConfig,
240    current_stage: PipelineStage,
241    progress_tracker: Option<ProgressTracker>,
242    /// Normalization gain computed during `AudioAnalysis`, applied in encode passes.
243    normalization_gain_db: f64,
244    /// Encoding start time (populated once encoding begins).
245    encode_start: Option<Instant>,
246    /// Accumulated per-pass statistics (bytes, frames).
247    accumulated_stats: PassStats,
248}
249
250impl Pipeline {
251    /// Creates a new pipeline with the given configuration.
252    #[must_use]
253    pub fn new(config: PipelineConfig) -> Self {
254        Self {
255            config,
256            current_stage: PipelineStage::Validation,
257            progress_tracker: None,
258            normalization_gain_db: 0.0,
259            encode_start: None,
260            accumulated_stats: PassStats::default(),
261        }
262    }
263
264    /// Sets the progress tracker.
265    pub fn set_progress_tracker(&mut self, tracker: ProgressTracker) {
266        self.progress_tracker = Some(tracker);
267    }
268
269    /// Executes the pipeline.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if any pipeline stage fails.
274    pub async fn execute(&mut self) -> Result<TranscodeOutput> {
275        // Validation stage
276        self.current_stage = PipelineStage::Validation;
277        self.validate()?;
278
279        // Audio analysis (if normalization enabled)
280        if self.config.normalization.is_some() {
281            self.current_stage = PipelineStage::AudioAnalysis;
282            self.analyze_audio().await?;
283        }
284
285        // Record encode start time
286        self.encode_start = Some(Instant::now());
287
288        // Multi-pass encoding
289        if let Some(multipass_config) = &self.config.multipass {
290            let mut encoder = MultiPassEncoder::new(multipass_config.clone());
291
292            while encoder.has_more_passes() {
293                let pass = encoder.current_pass();
294                self.current_stage = match pass {
295                    1 => PipelineStage::FirstPass,
296                    2 => PipelineStage::SecondPass,
297                    _ => PipelineStage::ThirdPass,
298                };
299
300                self.execute_pass(pass, &encoder).await?;
301                encoder.next_pass();
302            }
303
304            // Cleanup statistics files
305            encoder.cleanup()?;
306        } else {
307            // Single-pass encoding – accumulate stats.
308            self.current_stage = PipelineStage::Encode;
309            let stats = self.execute_single_pass().await?;
310            self.accumulated_stats.bytes_in += stats.bytes_in;
311            self.accumulated_stats.bytes_out += stats.bytes_out;
312            self.accumulated_stats.video_frames += stats.video_frames;
313            self.accumulated_stats.audio_frames += stats.audio_frames;
314        }
315
316        // Verification
317        self.current_stage = PipelineStage::Verification;
318        self.verify_output().await
319    }
320
321    /// Gets the current pipeline stage.
322    #[must_use]
323    pub fn current_stage(&self) -> &PipelineStage {
324        &self.current_stage
325    }
326
327    // ── Frame-level detection ─────────────────────────────────────────────────
328
329    /// Returns `true` when the pipeline configuration requests a frame-level
330    /// transcode operation (i.e., a video or audio codec that requires
331    /// decode → filter → encode rather than packet-level stream-copy).
332    ///
333    /// The following are **not** considered frame-level:
334    /// - No codec override (stream-copy default).
335    /// - `video_codec` of `"copy"` or `"stream-copy"`.
336    /// - `video_codec` that maps to an intra-only codec handled by the
337    ///   [`make_video_encoder`] path (MJPEG, APV).
338    ///
339    /// Everything else — including AV1, VP9, VP8, Opus, Vorbis — is treated as
340    /// frame-level because it requires a full decode → encode cycle that the
341    /// packet-level `remux` loop cannot perform.
342    fn requires_frame_level(&self) -> bool {
343        let video_needs_frame_level = self.config.video_codec.as_deref().map_or(false, |vc| {
344            let lc = vc.to_lowercase();
345            lc != "copy"
346                && lc != "stream-copy"
347                && lc != "stream_copy"
348                && parse_intra_codec(vc).is_none()
349        });
350
351        let audio_needs_frame_level = self.config.audio_codec.as_deref().map_or(false, |ac| {
352            let lc = ac.to_lowercase();
353            lc != "copy" && lc != "stream-copy" && lc != "stream_copy"
354        });
355
356        video_needs_frame_level || audio_needs_frame_level
357    }
358
359    // ── Validation ───────────────────────────────────────────────────────────
360
361    fn validate(&self) -> Result<()> {
362        use crate::validation::{InputValidator, OutputValidator};
363
364        InputValidator::validate_path(
365            self.config
366                .input
367                .to_str()
368                .ok_or_else(|| TranscodeError::InvalidInput("Invalid input path".to_string()))?,
369        )?;
370
371        OutputValidator::validate_path(
372            self.config
373                .output
374                .to_str()
375                .ok_or_else(|| TranscodeError::InvalidOutput("Invalid output path".to_string()))?,
376            true,
377        )?;
378
379        Ok(())
380    }
381
382    // ── Audio analysis ────────────────────────────────────────────────────────
383
384    /// Scan the audio content and derive the normalization gain.
385    ///
386    /// Strategy: open the input with its native demuxer, collect all audio
387    /// packets, interpret the compressed payload bytes as raw f32 PCM (coarse
388    /// approximation sufficient for integrated-loudness estimation), feed them
389    /// into `LoudnessMeter`, and compute the required gain relative to the
390    /// configured target.
391    async fn analyze_audio(&mut self) -> Result<()> {
392        let norm_config = match &self.config.normalization {
393            Some(c) => c.clone(),
394            None => return Ok(()),
395        };
396
397        info!(
398            "Analysing audio loudness for normalization (target: {} LUFS)",
399            norm_config.standard.target_lufs()
400        );
401
402        let format = detect_format(&self.config.input).await?;
403
404        // Determine audio stream params heuristically.
405        let sample_rate = DEFAULT_SAMPLE_RATE;
406        let channels = DEFAULT_CHANNELS;
407
408        let meter_config = MeterConfig::minimal(Standard::EbuR128, sample_rate, channels);
409
410        let mut meter = LoudnessMeter::new(meter_config)
411            .map_err(|e| TranscodeError::NormalizationError(e.to_string()))?;
412
413        // Dispatch to the right demuxer and feed all audio packets to the meter.
414        match format {
415            ContainerFormat::Matroska => {
416                let source = FileSource::open(&self.config.input)
417                    .await
418                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
419                let mut demuxer = MatroskaDemuxer::new(source);
420                demuxer
421                    .probe()
422                    .await
423                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
424
425                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
426            }
427            ContainerFormat::Ogg => {
428                let source = FileSource::open(&self.config.input)
429                    .await
430                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
431                let mut demuxer = OggDemuxer::new(source);
432                demuxer
433                    .probe()
434                    .await
435                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
436
437                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
438            }
439            ContainerFormat::Wav => {
440                let source = FileSource::open(&self.config.input)
441                    .await
442                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
443                let mut demuxer = WavDemuxer::new(source);
444                demuxer
445                    .probe()
446                    .await
447                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
448
449                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
450            }
451            ContainerFormat::Flac => {
452                let source = FileSource::open(&self.config.input)
453                    .await
454                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
455                let mut demuxer = FlacDemuxer::new(source);
456                demuxer
457                    .probe()
458                    .await
459                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
460
461                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
462            }
463            other => {
464                warn!(
465                    "Audio analysis: unsupported format {:?} — skipping loudness scan",
466                    other
467                );
468                return Ok(());
469            }
470        }
471
472        let metrics = meter.metrics();
473        let measured_lufs = metrics.integrated_lufs;
474        let measured_peak = metrics.true_peak_dbtp;
475        let target_lufs = norm_config.standard.target_lufs();
476        let max_peak = norm_config.standard.max_true_peak_dbtp();
477
478        // Gain = target − measured, capped by headroom before true-peak limit.
479        let loudness_gain = target_lufs - measured_lufs;
480        let peak_headroom = max_peak - measured_peak;
481        self.normalization_gain_db = loudness_gain.min(peak_headroom);
482
483        info!(
484            "Audio analysis complete: measured {:.1} LUFS / {:.1} dBTP, \
485             required gain {:.2} dB",
486            measured_lufs, measured_peak, self.normalization_gain_db
487        );
488
489        Ok(())
490    }
491
492    // ── Pass execution ────────────────────────────────────────────────────────
493
494    /// Execute one pass of a multi-pass encode.
495    ///
496    /// For pass 1 (analysis) we run a demux-only scan without writing output.
497    /// For subsequent passes we run the full demux→mux path with stats tracking.
498    async fn execute_pass(&mut self, pass: u32, _encoder: &MultiPassEncoder) -> Result<()> {
499        info!("Starting encode pass {}", pass);
500
501        if pass == 1 {
502            // Analysis pass: demux and count packets/frames for statistics.
503            self.demux_and_count().await?;
504        } else {
505            // Encode pass: full remux – accumulate stats.
506            let stats = self.execute_single_pass().await?;
507            self.accumulated_stats.bytes_in += stats.bytes_in;
508            self.accumulated_stats.bytes_out += stats.bytes_out;
509            self.accumulated_stats.video_frames += stats.video_frames;
510            self.accumulated_stats.audio_frames += stats.audio_frames;
511        }
512
513        Ok(())
514    }
515
516    /// Demux the input and count packets (used in analysis passes).
517    async fn demux_and_count(&self) -> Result<u64> {
518        let format = detect_format(&self.config.input).await?;
519        let count = match format {
520            ContainerFormat::Matroska => {
521                let source = FileSource::open(&self.config.input)
522                    .await
523                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
524                let mut demuxer = MatroskaDemuxer::new(source);
525                demuxer
526                    .probe()
527                    .await
528                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
529                count_packets(&mut demuxer).await
530            }
531            ContainerFormat::Ogg => {
532                let source = FileSource::open(&self.config.input)
533                    .await
534                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
535                let mut demuxer = OggDemuxer::new(source);
536                demuxer
537                    .probe()
538                    .await
539                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
540                count_packets(&mut demuxer).await
541            }
542            ContainerFormat::Wav => {
543                let source = FileSource::open(&self.config.input)
544                    .await
545                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
546                let mut demuxer = WavDemuxer::new(source);
547                demuxer
548                    .probe()
549                    .await
550                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
551                count_packets(&mut demuxer).await
552            }
553            ContainerFormat::Flac => {
554                let source = FileSource::open(&self.config.input)
555                    .await
556                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
557                let mut demuxer = FlacDemuxer::new(source);
558                demuxer
559                    .probe()
560                    .await
561                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
562                count_packets(&mut demuxer).await
563            }
564            other => {
565                debug!("demux_and_count: unsupported format {:?}", other);
566                0
567            }
568        };
569
570        info!("Analysis pass: counted {} packets in input", count);
571        Ok(count)
572    }
573
574    // ── Single-pass encode ────────────────────────────────────────────────────
575
576    /// Execute a single-pass transcode: open input demuxer, probe streams,
577    /// open output muxer, copy all streams, then remux every packet.
578    ///
579    /// Audio packets with normalization gain configured have the linear gain
580    /// applied in-band (interpreted as interleaved i16 PCM LE).  All other
581    /// packets are stream-copied without modification.
582    ///
583    /// Returns a `PassStats` with real bytes-in/out and frame counts.
584    async fn execute_single_pass(&self) -> Result<PassStats> {
585        let input_path = &self.config.input;
586        let output_path = &self.config.output;
587
588        info!(
589            "Single-pass transcode: {} → {}",
590            input_path.display(),
591            output_path.display()
592        );
593
594        let in_format = detect_format(input_path).await?;
595        let out_format = output_format_from_path(output_path);
596
597        debug!(
598            "Input format: {:?}, output format: {:?}",
599            in_format, out_format
600        );
601
602        // ── Frame-level codec gate ────────────────────────────────────────────
603        //
604        // When a video or audio codec other than stream-copy is requested, the
605        // packet-level `remux` loop below cannot perform decode → filter →
606        // encode.  Callers who need full codec transcoding should use
607        // [`MultiTrackExecutor`] directly:
608        //
609        // ```rust,ignore
610        // use oximedia_transcode::multi_track::{MultiTrackExecutor, PerTrack};
611        //
612        // let mut executor = MultiTrackExecutor::new(muxer);
613        // executor.add_track(PerTrack::new_typed(0, decoder, FilterGraph::new(), encoder, false));
614        // let stats = executor.execute(&streams).await?;
615        // ```
616        if self.requires_frame_level() {
617            let vc = self.config.video_codec.as_deref().unwrap_or("(none)");
618            let ac = self.config.audio_codec.as_deref().unwrap_or("(none)");
619            return Err(TranscodeError::Unsupported(format!(
620                "Codec transcoding (video={vc}, audio={ac}) requires frame-level decode→encode. \
621                 Use MultiTrackExecutor with per-track FrameDecoder/FrameEncoder instances \
622                 instead of Pipeline::execute()."
623            )));
624        }
625
626        // Ensure output directory exists.
627        if let Some(parent) = output_path.parent() {
628            if !parent.as_os_str().is_empty() && !parent.exists() {
629                #[cfg(not(target_arch = "wasm32"))]
630                {
631                    tokio::fs::create_dir_all(parent)
632                        .await
633                        .map_err(|e| TranscodeError::IoError(e.to_string()))?;
634                }
635                #[cfg(target_arch = "wasm32")]
636                {
637                    return Err(TranscodeError::IoError(
638                        "Filesystem operations not supported on wasm32".to_string(),
639                    ));
640                }
641            }
642        }
643
644        // Dispatch to the correctly-typed demuxer path.
645        let stats = match in_format {
646            ContainerFormat::Matroska => {
647                let source = FileSource::open(input_path)
648                    .await
649                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
650                let mut demuxer = MatroskaDemuxer::new(source);
651                demuxer
652                    .probe()
653                    .await
654                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
655                self.remux(&mut demuxer, out_format, output_path).await?
656            }
657            ContainerFormat::Ogg => {
658                let source = FileSource::open(input_path)
659                    .await
660                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
661                let mut demuxer = OggDemuxer::new(source);
662                demuxer
663                    .probe()
664                    .await
665                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
666                self.remux(&mut demuxer, out_format, output_path).await?
667            }
668            ContainerFormat::Wav => {
669                let source = FileSource::open(input_path)
670                    .await
671                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
672                let mut demuxer = WavDemuxer::new(source);
673                demuxer
674                    .probe()
675                    .await
676                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
677                self.remux(&mut demuxer, out_format, output_path).await?
678            }
679            ContainerFormat::Flac => {
680                let source = FileSource::open(input_path)
681                    .await
682                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
683                let mut demuxer = FlacDemuxer::new(source);
684                demuxer
685                    .probe()
686                    .await
687                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
688                self.remux(&mut demuxer, out_format, output_path).await?
689            }
690            other => {
691                return Err(TranscodeError::ContainerError(format!(
692                    "Unsupported input container format: {:?}",
693                    other
694                )));
695            }
696        };
697
698        Ok(stats)
699    }
700
701    /// Core remux loop: drain `demuxer` into the appropriate output muxer.
702    ///
703    /// The output format chooses the concrete muxer type.  Stream info is
704    /// collected after probing, added to the muxer, the header is written,
705    /// then packets are forwarded one by one with:
706    ///
707    /// - The normalization gain applied to every audio packet (i16 PCM LE,
708    ///   in-band — same approach as `FramePipelineExecutor`).
709    /// - Per-packet bytes-in / bytes-out and video/audio frame counters
710    ///   accumulated in the returned `PassStats`.
711    ///
712    /// Finally the trailer is written and the stats are returned.
713    async fn remux<D>(
714        &self,
715        demuxer: &mut D,
716        out_format: ContainerFormat,
717        output_path: &std::path::Path,
718    ) -> Result<PassStats>
719    where
720        D: Demuxer,
721    {
722        // Gather streams from the demuxer.
723        let streams: Vec<StreamInfo> = demuxer.streams().to_vec();
724
725        if streams.is_empty() {
726            return Err(TranscodeError::ContainerError(
727                "Input container has no streams".to_string(),
728            ));
729        }
730
731        // Identify audio stream indices for gain application.
732        let audio_stream_indices: Vec<usize> = streams
733            .iter()
734            .filter(|s| s.is_audio())
735            .map(|s| s.index)
736            .collect();
737
738        // ── Intra-codec fast path: validate encoder before remuxing ─────────────
739        //
740        // When the requested video codec is MJPEG or APV (intra-only codecs),
741        // `make_video_encoder` is called here to:
742        //   1. Confirm the codec feature is compiled in.
743        //   2. Validate the params (width/height from stream header).
744        //   3. Provide a ready-to-use encoder for the encode loop.
745        //
746        // If it cannot be built (feature disabled, bad params) the pipeline
747        // returns an error immediately rather than silently falling back to an
748        // incorrect stream-copy path.
749        if let Some(ref vc) = self.config.video_codec {
750            if let Some(intra_id) = parse_intra_codec(vc) {
751                let quality = self
752                    .config
753                    .quality
754                    .as_ref()
755                    .and_then(|q| {
756                        if let RateControlMode::Crf(v) = q.rate_control {
757                            Some(v)
758                        } else {
759                            None
760                        }
761                    })
762                    .unwrap_or(INTRA_DEFAULT_QUALITY);
763
764                let params = intra_encoder_params(&streams, quality)?;
765                // Build and immediately drop – this validates the encoder can be
766                // instantiated with the given params and feature flags.
767                let _encoder = make_video_encoder(intra_id, &params)?;
768                info!(
769                    "Intra-codec {} encoder ready: {}×{} quality={}",
770                    vc, params.width, params.height, quality
771                );
772            } else {
773                debug!("Video codec override requested: {} (stream-copy path)", vc);
774            }
775        }
776        if let Some(ref ac) = self.config.audio_codec {
777            debug!("Audio codec override requested: {} (stream-copy path)", ac);
778        }
779        if self.normalization_gain_db.abs() > 0.01 {
780            info!(
781                "Normalization gain {:.2} dB will be applied to {} audio stream(s)",
782                self.normalization_gain_db,
783                audio_stream_indices.len()
784            );
785        }
786
787        let mux_config = MuxerConfig::new().with_writing_app("OxiMedia-Transcode");
788
789        let stats = match out_format {
790            ContainerFormat::Matroska => {
791                let sink = FileSource::create(output_path)
792                    .await
793                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
794                let mut muxer = MatroskaMuxer::new(sink, mux_config);
795                for stream in &streams {
796                    muxer
797                        .add_stream(stream.clone())
798                        .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
799                }
800                muxer
801                    .write_header()
802                    .await
803                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
804
805                let stats = drain_packets_with_gain(
806                    demuxer,
807                    &mut muxer,
808                    &self.progress_tracker,
809                    &audio_stream_indices,
810                    self.normalization_gain_db,
811                )
812                .await?;
813
814                muxer
815                    .write_trailer()
816                    .await
817                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
818
819                stats
820            }
821            ContainerFormat::Ogg => {
822                let sink = FileSource::create(output_path)
823                    .await
824                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
825                let mut muxer = OggMuxer::new(sink, mux_config);
826                for stream in &streams {
827                    muxer
828                        .add_stream(stream.clone())
829                        .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
830                }
831                muxer
832                    .write_header()
833                    .await
834                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
835
836                let stats = drain_packets_with_gain(
837                    demuxer,
838                    &mut muxer,
839                    &self.progress_tracker,
840                    &audio_stream_indices,
841                    self.normalization_gain_db,
842                )
843                .await?;
844
845                muxer
846                    .write_trailer()
847                    .await
848                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
849
850                stats
851            }
852            other => {
853                return Err(TranscodeError::ContainerError(format!(
854                    "Unsupported output container format: {:?}",
855                    other
856                )));
857            }
858        };
859
860        Ok(stats)
861    }
862
863    // ── Output verification ────────────────────────────────────────────────────
864
865    /// Verify the output file exists and assemble `TranscodeOutput` with real stats.
866    ///
867    /// Uses the `accumulated_stats` gathered during encode passes to populate
868    /// frame counts and byte totals.  Duration is approximated from the output
869    /// file size (avoids a second full demux parse).
870    async fn verify_output(&self) -> Result<TranscodeOutput> {
871        let output_path = &self.config.output;
872
873        #[cfg(not(target_arch = "wasm32"))]
874        let metadata = tokio::fs::metadata(output_path).await.map_err(|e| {
875            TranscodeError::IoError(format!(
876                "Output file '{}' not found or unreadable: {}",
877                output_path.display(),
878                e
879            ))
880        })?;
881        #[cfg(target_arch = "wasm32")]
882        let metadata = std::fs::metadata(output_path).map_err(|e| {
883            TranscodeError::IoError(format!(
884                "Output file '{}' not found or unreadable: {}",
885                output_path.display(),
886                e
887            ))
888        })?;
889
890        let file_size = metadata.len();
891        if file_size == 0 {
892            return Err(TranscodeError::PipelineError(
893                "Output file is empty — transcode may have failed".to_string(),
894            ));
895        }
896
897        let encoding_time = match self.encode_start {
898            Some(t) => t.elapsed().as_secs_f64(),
899            None => 0.0,
900        };
901
902        // Derive a rough duration from the file size and a heuristic bitrate.
903        // A full duration query would require re-opening and probing the output;
904        // we skip that to avoid a second full parse on every transcode.
905        let duration_approx = derive_duration_approx(file_size);
906
907        let speed_factor = if encoding_time > 0.0 && duration_approx > 0.0 {
908            duration_approx / encoding_time
909        } else {
910            1.0
911        };
912
913        // Derive approximate per-stream bitrates from accumulated byte counts
914        // and the heuristic duration.
915        let total_frames =
916            self.accumulated_stats.video_frames + self.accumulated_stats.audio_frames;
917        let video_bitrate_approx = if duration_approx > 0.0
918            && self.accumulated_stats.video_frames > 0
919            && total_frames > 0
920        {
921            let video_fraction = self.accumulated_stats.video_frames as f64 / total_frames as f64;
922            ((self.accumulated_stats.bytes_out as f64 * video_fraction * 8.0) / duration_approx)
923                as u64
924        } else {
925            0u64
926        };
927        let audio_bitrate_approx = if duration_approx > 0.0
928            && self.accumulated_stats.audio_frames > 0
929            && total_frames > 0
930        {
931            let audio_fraction = self.accumulated_stats.audio_frames as f64 / total_frames as f64;
932            ((self.accumulated_stats.bytes_out as f64 * audio_fraction * 8.0) / duration_approx)
933                as u64
934        } else {
935            0u64
936        };
937
938        info!(
939            "Transcode complete: {} video frames, {} audio frames, \
940             {} bytes in → {} bytes out, encoding time {:.2}s, speed {:.2}×",
941            self.accumulated_stats.video_frames,
942            self.accumulated_stats.audio_frames,
943            self.accumulated_stats.bytes_in,
944            self.accumulated_stats.bytes_out,
945            encoding_time,
946            speed_factor
947        );
948
949        Ok(TranscodeOutput {
950            output_path: output_path
951                .to_str()
952                .map(String::from)
953                .unwrap_or_else(|| output_path.display().to_string()),
954            file_size,
955            duration: duration_approx,
956            video_bitrate: video_bitrate_approx,
957            audio_bitrate: audio_bitrate_approx,
958            encoding_time,
959            speed_factor,
960        })
961    }
962}
963
964// ─── Free helper functions ────────────────────────────────────────────────────
965
966/// Drain all packets from `demuxer` and write them via `muxer`.
967///
968/// For every audio packet (identified by stream index membership in
969/// `audio_stream_indices`) the `gain_db` is applied to the raw payload
970/// interpreted as interleaved i16 PCM little-endian samples.  A gain of
971/// 0.0 dB is a no-op.
972///
973/// Returns a `PassStats` with real bytes-in / bytes-out and frame counts so
974/// the caller can build meaningful `TranscodeOutput` statistics.
975async fn drain_packets_with_gain<D, M>(
976    demuxer: &mut D,
977    muxer: &mut M,
978    _progress: &Option<ProgressTracker>,
979    audio_stream_indices: &[usize],
980    gain_db: f64,
981) -> Result<PassStats>
982where
983    D: Demuxer,
984    M: Muxer,
985{
986    let mut stats = PassStats::default();
987    // Pre-compute linear gain factor once; skip application when effectively unity.
988    let gain_linear = 10f64.powf(gain_db / 20.0) as f32;
989    let apply_gain = gain_db.abs() > 0.01 && !audio_stream_indices.is_empty();
990
991    loop {
992        match demuxer.read_packet().await {
993            Ok(mut pkt) => {
994                if pkt.should_discard() {
995                    continue;
996                }
997
998                let raw_len = pkt.data.len() as u64;
999                stats.bytes_in += raw_len;
1000
1001                if audio_stream_indices.contains(&pkt.stream_index) {
1002                    // Apply loudness normalisation gain to the audio payload.
1003                    if apply_gain {
1004                        pkt.data = apply_i16_gain(pkt.data, gain_linear);
1005                    }
1006                    stats.audio_frames += 1;
1007                } else {
1008                    stats.video_frames += 1;
1009                }
1010
1011                let out_len = pkt.data.len() as u64;
1012                stats.bytes_out += out_len;
1013
1014                muxer
1015                    .write_packet(&pkt)
1016                    .await
1017                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
1018
1019                let total = stats.video_frames + stats.audio_frames;
1020                if total % 500 == 0 {
1021                    debug!(
1022                        "Remuxed {} packets ({} video, {} audio)",
1023                        total, stats.video_frames, stats.audio_frames
1024                    );
1025                }
1026            }
1027            Err(e) if e.is_eof() => break,
1028            Err(e) => {
1029                return Err(TranscodeError::ContainerError(format!(
1030                    "Error reading packet: {}",
1031                    e
1032                )));
1033            }
1034        }
1035    }
1036
1037    debug!(
1038        "drain_packets_with_gain: {} video frames, {} audio frames, \
1039         {} bytes in, {} bytes out",
1040        stats.video_frames, stats.audio_frames, stats.bytes_in, stats.bytes_out
1041    );
1042    Ok(stats)
1043}
1044
1045/// Apply a linear gain to an i16 PCM LE buffer.
1046///
1047/// Every pair of bytes is interpreted as a little-endian i16 sample,
1048/// multiplied by `gain_linear`, clamped to `[i16::MIN, i16::MAX]`, and
1049/// written back.  Trailing odd bytes are left unchanged.
1050fn apply_i16_gain(data: bytes::Bytes, gain_linear: f32) -> bytes::Bytes {
1051    if (gain_linear - 1.0).abs() < f32::EPSILON {
1052        return data;
1053    }
1054    let mut buf: Vec<u8> = data.into();
1055    let n_samples = buf.len() / 2;
1056    for i in 0..n_samples {
1057        let lo = buf[i * 2];
1058        let hi = buf[i * 2 + 1];
1059        let sample = i16::from_le_bytes([lo, hi]) as f32;
1060        let gained = (sample * gain_linear).clamp(i16::MIN as f32, i16::MAX as f32) as i16;
1061        let out = gained.to_le_bytes();
1062        buf[i * 2] = out[0];
1063        buf[i * 2 + 1] = out[1];
1064    }
1065    bytes::Bytes::from(buf)
1066}
1067
1068/// Count all packets in `demuxer` (consumes the stream).
1069async fn count_packets<D: Demuxer>(demuxer: &mut D) -> u64 {
1070    let mut count: u64 = 0;
1071    loop {
1072        match demuxer.read_packet().await {
1073            Ok(_) => count += 1,
1074            Err(e) if e.is_eof() => break,
1075            Err(_) => break,
1076        }
1077    }
1078    count
1079}
1080
1081/// Read all audio packets from `demuxer` and feed them to `meter`.
1082///
1083/// Compressed audio bytes are reinterpreted as little-endian f32 samples.
1084/// This is a coarse approximation but produces a consistent integrated
1085/// loudness estimate for normalization-gain calculation.
1086async fn feed_audio_packets_to_meter<D: Demuxer>(demuxer: &mut D, meter: &mut LoudnessMeter) {
1087    let audio_stream_indices: Vec<usize> = demuxer
1088        .streams()
1089        .iter()
1090        .filter(|s| s.is_audio())
1091        .map(|s| s.index)
1092        .collect();
1093
1094    loop {
1095        match demuxer.read_packet().await {
1096            Ok(pkt) => {
1097                if !audio_stream_indices.contains(&pkt.stream_index) {
1098                    continue;
1099                }
1100                // Reinterpret compressed bytes as f32 PCM for coarse metering.
1101                let samples = bytes_as_f32_samples(&pkt.data);
1102                if !samples.is_empty() {
1103                    meter.process_f32(&samples);
1104                }
1105            }
1106            Err(e) if e.is_eof() => break,
1107            Err(_) => break,
1108        }
1109    }
1110}
1111
1112/// Reinterpret a byte slice as a vector of f32 values by reading every 4
1113/// bytes as a little-endian f32.  Trailing bytes (< 4) are ignored.
1114fn bytes_as_f32_samples(data: &[u8]) -> Vec<f32> {
1115    let n_samples = data.len() / 4;
1116    let mut out = Vec::with_capacity(n_samples);
1117    for i in 0..n_samples {
1118        let base = i * 4;
1119        let raw = u32::from_le_bytes([data[base], data[base + 1], data[base + 2], data[base + 3]]);
1120        out.push(f32::from_bits(raw));
1121    }
1122    out
1123}
1124
1125/// Derive a coarse duration in seconds from the file size.
1126///
1127/// We avoid a second full demux cycle by estimating against a typical
1128/// 5 Mbit/s bitrate.  The returned value is used only for the speed-factor
1129/// field of `TranscodeOutput`.
1130fn derive_duration_approx(file_size: u64) -> f64 {
1131    // 5 Mbit/s → 625 000 bytes/second
1132    const BYTES_PER_SECOND: f64 = 625_000.0;
1133    file_size as f64 / BYTES_PER_SECOND
1134}
1135
1136// ─── TranscodePipeline (public builder facade) ────────────────────────────────
1137
1138/// Builder for transcoding pipelines.
1139pub struct TranscodePipeline {
1140    config: PipelineConfig,
1141}
1142
1143impl TranscodePipeline {
1144    /// Creates a new pipeline builder.
1145    #[must_use]
1146    pub fn builder() -> TranscodePipelineBuilder {
1147        TranscodePipelineBuilder::new()
1148    }
1149
1150    /// Sets the video codec.
1151    pub fn set_video_codec(&mut self, codec: &str) {
1152        self.config.video_codec = Some(codec.to_string());
1153    }
1154
1155    /// Sets the audio codec.
1156    pub fn set_audio_codec(&mut self, codec: &str) {
1157        self.config.audio_codec = Some(codec.to_string());
1158    }
1159
1160    /// Executes the pipeline.
1161    ///
1162    /// # Errors
1163    ///
1164    /// Returns an error if the pipeline execution fails.
1165    pub async fn execute(&mut self) -> crate::Result<TranscodeOutput> {
1166        let mut pipeline = Pipeline::new(self.config.clone());
1167        pipeline.execute().await
1168    }
1169}
1170
1171// ─── TranscodePipelineBuilder ─────────────────────────────────────────────────
1172
1173/// Builder for creating transcoding pipelines.
1174pub struct TranscodePipelineBuilder {
1175    input: Option<PathBuf>,
1176    output: Option<PathBuf>,
1177    video_codec: Option<String>,
1178    audio_codec: Option<String>,
1179    quality: Option<QualityConfig>,
1180    multipass: Option<MultiPassMode>,
1181    normalization: Option<NormalizationConfig>,
1182    track_progress: bool,
1183    hw_accel: bool,
1184}
1185
1186impl TranscodePipelineBuilder {
1187    /// Creates a new pipeline builder.
1188    #[must_use]
1189    pub fn new() -> Self {
1190        Self {
1191            input: None,
1192            output: None,
1193            video_codec: None,
1194            audio_codec: None,
1195            quality: None,
1196            multipass: None,
1197            normalization: None,
1198            track_progress: false,
1199            hw_accel: true,
1200        }
1201    }
1202
1203    /// Sets the input file.
1204    #[must_use]
1205    pub fn input(mut self, path: impl Into<PathBuf>) -> Self {
1206        self.input = Some(path.into());
1207        self
1208    }
1209
1210    /// Sets the output file.
1211    #[must_use]
1212    pub fn output(mut self, path: impl Into<PathBuf>) -> Self {
1213        self.output = Some(path.into());
1214        self
1215    }
1216
1217    /// Sets the video codec.
1218    #[must_use]
1219    pub fn video_codec(mut self, codec: impl Into<String>) -> Self {
1220        self.video_codec = Some(codec.into());
1221        self
1222    }
1223
1224    /// Sets the audio codec.
1225    #[must_use]
1226    pub fn audio_codec(mut self, codec: impl Into<String>) -> Self {
1227        self.audio_codec = Some(codec.into());
1228        self
1229    }
1230
1231    /// Sets the quality configuration.
1232    #[must_use]
1233    pub fn quality(mut self, quality: QualityConfig) -> Self {
1234        self.quality = Some(quality);
1235        self
1236    }
1237
1238    /// Sets the multi-pass mode.
1239    #[must_use]
1240    pub fn multipass(mut self, mode: MultiPassMode) -> Self {
1241        self.multipass = Some(mode);
1242        self
1243    }
1244
1245    /// Sets the normalization configuration.
1246    #[must_use]
1247    pub fn normalization(mut self, config: NormalizationConfig) -> Self {
1248        self.normalization = Some(config);
1249        self
1250    }
1251
1252    /// Enables progress tracking.
1253    #[must_use]
1254    pub fn track_progress(mut self, enable: bool) -> Self {
1255        self.track_progress = enable;
1256        self
1257    }
1258
1259    /// Enables hardware acceleration.
1260    #[must_use]
1261    pub fn hw_accel(mut self, enable: bool) -> Self {
1262        self.hw_accel = enable;
1263        self
1264    }
1265
1266    /// Builds the transcoding pipeline.
1267    ///
1268    /// # Errors
1269    ///
1270    /// Returns an error if required fields are missing.
1271    pub fn build(self) -> crate::Result<TranscodePipeline> {
1272        let input = self
1273            .input
1274            .ok_or_else(|| TranscodeError::InvalidInput("Input path not specified".to_string()))?;
1275
1276        let output = self.output.ok_or_else(|| {
1277            TranscodeError::InvalidOutput("Output path not specified".to_string())
1278        })?;
1279
1280        let multipass_config = self.multipass.map(|mode| {
1281            MultiPassConfig::new(
1282                mode,
1283                std::env::temp_dir().join("oximedia-transcode-stats.log"),
1284            )
1285        });
1286
1287        Ok(TranscodePipeline {
1288            config: PipelineConfig {
1289                input,
1290                output,
1291                video_codec: self.video_codec,
1292                audio_codec: self.audio_codec,
1293                quality: self.quality,
1294                multipass: multipass_config,
1295                normalization: self.normalization,
1296                track_progress: self.track_progress,
1297                hw_accel: self.hw_accel,
1298            },
1299        })
1300    }
1301}
1302
1303impl Default for TranscodePipelineBuilder {
1304    fn default() -> Self {
1305        Self::new()
1306    }
1307}
1308
1309// ─── Tests ────────────────────────────────────────────────────────────────────
1310
1311#[cfg(test)]
1312mod tests {
1313    use super::*;
1314
1315    fn tmp_in() -> PathBuf {
1316        std::env::temp_dir().join("oximedia-transcode-pipeline-input.mkv")
1317    }
1318
1319    fn tmp_out() -> PathBuf {
1320        std::env::temp_dir().join("oximedia-transcode-pipeline-output.mkv")
1321    }
1322
1323    #[test]
1324    fn test_pipeline_builder() {
1325        let ti = tmp_in();
1326        let to = tmp_out();
1327        let result = TranscodePipelineBuilder::new()
1328            .input(ti.clone())
1329            .output(to.clone())
1330            .video_codec("vp9")
1331            .audio_codec("opus")
1332            .track_progress(true)
1333            .hw_accel(false)
1334            .build();
1335
1336        assert!(result.is_ok());
1337        let pipeline = result.expect("should succeed in test");
1338        assert_eq!(pipeline.config.input, ti);
1339        assert_eq!(pipeline.config.output, to);
1340        assert_eq!(pipeline.config.video_codec, Some("vp9".to_string()));
1341        assert_eq!(pipeline.config.audio_codec, Some("opus".to_string()));
1342        assert!(pipeline.config.track_progress);
1343        assert!(!pipeline.config.hw_accel);
1344    }
1345
1346    #[test]
1347    fn test_pipeline_builder_missing_input() {
1348        let result = TranscodePipelineBuilder::new().output(tmp_out()).build();
1349        assert!(result.is_err());
1350    }
1351
1352    #[test]
1353    fn test_pipeline_builder_missing_output() {
1354        let result = TranscodePipelineBuilder::new().input(tmp_in()).build();
1355        assert!(result.is_err());
1356    }
1357
1358    #[test]
1359    fn test_pipeline_stage_flow() {
1360        let config = PipelineConfig {
1361            input: tmp_in(),
1362            output: tmp_out(),
1363            video_codec: None,
1364            audio_codec: None,
1365            quality: None,
1366            multipass: None,
1367            normalization: None,
1368            track_progress: false,
1369            hw_accel: true,
1370        };
1371
1372        let pipeline = Pipeline::new(config);
1373        assert!(matches!(
1374            pipeline.current_stage(),
1375            PipelineStage::Validation
1376        ));
1377    }
1378
1379    #[test]
1380    fn test_output_format_from_path() {
1381        assert!(matches!(
1382            output_format_from_path(std::path::Path::new("out.mkv")),
1383            ContainerFormat::Matroska
1384        ));
1385        assert!(matches!(
1386            output_format_from_path(std::path::Path::new("out.webm")),
1387            ContainerFormat::Matroska
1388        ));
1389        assert!(matches!(
1390            output_format_from_path(std::path::Path::new("out.ogg")),
1391            ContainerFormat::Ogg
1392        ));
1393    }
1394
1395    #[test]
1396    fn test_bytes_as_f32_samples_empty() {
1397        let samples = bytes_as_f32_samples(&[]);
1398        assert!(samples.is_empty());
1399    }
1400
1401    #[test]
1402    fn test_bytes_as_f32_samples_partial() {
1403        // 7 bytes → only 1 full f32 (4 bytes), trailing 3 discarded
1404        let data = [0u8; 7];
1405        let samples = bytes_as_f32_samples(&data);
1406        assert_eq!(samples.len(), 1);
1407    }
1408
1409    #[test]
1410    fn test_bytes_as_f32_known_value() {
1411        // 0.0f32 little-endian = [0x00, 0x00, 0x00, 0x00]
1412        let data = [0x00u8, 0x00, 0x00, 0x00];
1413        let samples = bytes_as_f32_samples(&data);
1414        assert_eq!(samples.len(), 1);
1415        assert_eq!(samples[0], 0.0f32);
1416    }
1417
1418    // ── apply_i16_gain tests ──────────────────────────────────────────────────
1419
1420    #[test]
1421    fn test_apply_i16_gain_unity() {
1422        // gain of 1.0 must be a no-op at the byte level.
1423        let sample: i16 = 1234;
1424        let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1425        let out = apply_i16_gain(raw.clone(), 1.0);
1426        assert_eq!(&out[..], &raw[..]);
1427    }
1428
1429    #[test]
1430    fn test_apply_i16_gain_double() {
1431        // 1000 × 2.0 = 2000
1432        let sample: i16 = 1000;
1433        let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1434        let out = apply_i16_gain(raw, 2.0);
1435        let result = i16::from_le_bytes([out[0], out[1]]);
1436        assert_eq!(result, 2000);
1437    }
1438
1439    #[test]
1440    fn test_apply_i16_gain_clamp_positive() {
1441        // i16::MAX × 2.0 should clamp to i16::MAX.
1442        let sample: i16 = i16::MAX;
1443        let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1444        let out = apply_i16_gain(raw, 2.0);
1445        let result = i16::from_le_bytes([out[0], out[1]]);
1446        assert_eq!(result, i16::MAX);
1447    }
1448
1449    #[test]
1450    fn test_apply_i16_gain_clamp_negative() {
1451        // i16::MIN × 2.0 should clamp to i16::MIN.
1452        let sample: i16 = i16::MIN;
1453        let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1454        let out = apply_i16_gain(raw, 2.0);
1455        let result = i16::from_le_bytes([out[0], out[1]]);
1456        assert_eq!(result, i16::MIN);
1457    }
1458
1459    #[test]
1460    fn test_apply_i16_gain_half() {
1461        // 2000 × 0.5 = 1000
1462        let sample: i16 = 2000;
1463        let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1464        let out = apply_i16_gain(raw, 0.5);
1465        let result = i16::from_le_bytes([out[0], out[1]]);
1466        assert_eq!(result, 1000);
1467    }
1468
1469    #[test]
1470    fn test_apply_i16_gain_odd_byte_length() {
1471        // Buffers with an odd number of bytes: last byte untouched.
1472        let raw = bytes::Bytes::from(vec![0xFFu8, 0x7F, 0xAB]); // [i16::MAX, trailing 0xAB]
1473        let out = apply_i16_gain(raw, 2.0);
1474        // First sample should clamp
1475        let result = i16::from_le_bytes([out[0], out[1]]);
1476        assert_eq!(result, i16::MAX);
1477        // Third byte unchanged
1478        assert_eq!(out[2], 0xAB);
1479    }
1480
1481    // ── PassStats default test ────────────────────────────────────────────────
1482
1483    #[test]
1484    fn test_pass_stats_default() {
1485        let stats = PassStats::default();
1486        assert_eq!(stats.bytes_in, 0);
1487        assert_eq!(stats.bytes_out, 0);
1488        assert_eq!(stats.video_frames, 0);
1489        assert_eq!(stats.audio_frames, 0);
1490    }
1491
1492    // ── Full pipeline execute tests (async, require temp files) ───────────────
1493
1494    /// Build a minimal Matroska byte stream in memory using the muxer, write it
1495    /// to a temp file, run `TranscodePipeline::execute()` over it, and verify
1496    /// the output file is non-empty and the returned stats are meaningful.
1497    ///
1498    /// Uses a video-only stream to avoid codec-specific audio complications.
1499    #[tokio::test]
1500    async fn test_pipeline_execute_remux_produces_output() {
1501        use oximedia_container::{
1502            mux::{MatroskaMuxer, MuxerConfig},
1503            Muxer, Packet, PacketFlags, StreamInfo,
1504        };
1505        use oximedia_core::{CodecId, Rational, Timestamp};
1506        use oximedia_io::MemorySource;
1507
1508        // ── Build a synthetic Matroska file in memory ───────────────────────
1509        let in_buf = MemorySource::new_writable(64 * 1024);
1510        let mut muxer = MatroskaMuxer::new(in_buf, MuxerConfig::new());
1511
1512        let mut video = StreamInfo::new(0, CodecId::Vp9, Rational::new(1, 1000));
1513        video.codec_params.width = Some(320);
1514        video.codec_params.height = Some(240);
1515        muxer.add_stream(video).expect("add stream");
1516        muxer.write_header().await.expect("write header");
1517
1518        // Write 30 synthetic video packets.
1519        for i in 0u64..30 {
1520            let data = vec![0x42u8, 0x00, (i & 0xFF) as u8, 0x01];
1521            let pkt = Packet::new(
1522                0,
1523                bytes::Bytes::from(data),
1524                Timestamp::new(i as i64 * 33, Rational::new(1, 1000)),
1525                PacketFlags::KEYFRAME,
1526            );
1527            muxer.write_packet(&pkt).await.expect("write packet");
1528        }
1529        muxer.write_trailer().await.expect("write trailer");
1530
1531        // Extract the in-memory bytes and write to a temp file.
1532        let tmp_dir = std::env::temp_dir();
1533        let input_path = tmp_dir.join("pipeline_test_input.mkv");
1534        let output_path = tmp_dir.join("pipeline_test_output.mkv");
1535
1536        let sink = muxer.into_sink();
1537        let mkv_bytes = sink.written_data().to_vec();
1538        tokio::fs::write(&input_path, &mkv_bytes)
1539            .await
1540            .expect("write temp input");
1541
1542        // ── Execute the pipeline ────────────────────────────────────────────
1543        let mut pipeline = TranscodePipelineBuilder::new()
1544            .input(input_path.clone())
1545            .output(output_path.clone())
1546            .build()
1547            .expect("build pipeline");
1548
1549        let result = pipeline.execute().await;
1550
1551        // Clean up temp files regardless of outcome.
1552        let _ = tokio::fs::remove_file(&input_path).await;
1553        let _ = tokio::fs::remove_file(&output_path).await;
1554
1555        let output = result.expect("pipeline execute should succeed");
1556
1557        // Stats must be meaningful (non-zero).
1558        assert!(
1559            output.file_size > 0,
1560            "output file size must be > 0, got {}",
1561            output.file_size
1562        );
1563        assert!(
1564            output.encoding_time >= 0.0,
1565            "encoding time must be non-negative"
1566        );
1567    }
1568
1569    /// Same as above but with audio gain normalization wired in.  Verifies
1570    /// that the gain path does not corrupt the output and returns valid stats.
1571    #[tokio::test]
1572    async fn test_pipeline_execute_with_normalization_gain() {
1573        use crate::{LoudnessStandard, NormalizationConfig};
1574        use oximedia_container::{
1575            mux::{MatroskaMuxer, MuxerConfig},
1576            Muxer, Packet, PacketFlags, StreamInfo,
1577        };
1578        use oximedia_core::{CodecId, Rational, Timestamp};
1579        use oximedia_io::MemorySource;
1580
1581        // ── Build synthetic Matroska with audio stream ──────────────────────
1582        let in_buf = MemorySource::new_writable(64 * 1024);
1583        let mut muxer = MatroskaMuxer::new(in_buf, MuxerConfig::new());
1584
1585        let mut audio = StreamInfo::new(0, CodecId::Opus, Rational::new(1, 48000));
1586        audio.codec_params.sample_rate = Some(48000);
1587        audio.codec_params.channels = Some(2);
1588        muxer.add_stream(audio).expect("add audio stream");
1589        muxer.write_header().await.expect("write header");
1590
1591        // Write 20 synthetic audio packets (treated as raw PCM-like bytes).
1592        for i in 0u64..20 {
1593            // 16 i16 samples (LE): all set to 100 (0x64)
1594            let sample_le: i16 = 100;
1595            let mut data = Vec::with_capacity(32);
1596            for _ in 0..16 {
1597                data.extend_from_slice(&sample_le.to_le_bytes());
1598            }
1599            let pkt = Packet::new(
1600                0,
1601                bytes::Bytes::from(data),
1602                Timestamp::new(i as i64 * 960, Rational::new(1, 48000)),
1603                PacketFlags::KEYFRAME,
1604            );
1605            muxer.write_packet(&pkt).await.expect("write audio packet");
1606        }
1607        muxer.write_trailer().await.expect("write trailer");
1608
1609        let tmp_dir = std::env::temp_dir();
1610        let input_path = tmp_dir.join("pipeline_norm_input.mkv");
1611        let output_path = tmp_dir.join("pipeline_norm_output.mkv");
1612
1613        let sink = muxer.into_sink();
1614        let mkv_bytes = sink.written_data().to_vec();
1615        tokio::fs::write(&input_path, &mkv_bytes)
1616            .await
1617            .expect("write temp input");
1618
1619        // Configure a +6 dB normalization gain so we can verify it's applied.
1620        let norm_config = NormalizationConfig::new(LoudnessStandard::EbuR128);
1621        // Manually force a known gain by constructing the pipeline config.
1622        let pipeline_config = PipelineConfig {
1623            input: input_path.clone(),
1624            output: output_path.clone(),
1625            video_codec: None,
1626            audio_codec: None,
1627            quality: None,
1628            multipass: None,
1629            normalization: Some(norm_config),
1630            track_progress: false,
1631            hw_accel: false,
1632        };
1633        let mut pipeline_inner = Pipeline::new(pipeline_config);
1634        // Skip audio analysis by directly setting the gain: +6 dB ≈ ×2.
1635        pipeline_inner.normalization_gain_db = 6.0206;
1636        pipeline_inner.encode_start = Some(std::time::Instant::now());
1637        pipeline_inner.current_stage = PipelineStage::Encode;
1638
1639        let pass_stats = pipeline_inner.execute_single_pass().await;
1640
1641        let _ = tokio::fs::remove_file(&input_path).await;
1642        let _ = tokio::fs::remove_file(&output_path).await;
1643
1644        let stats = pass_stats.expect("single-pass should succeed");
1645        assert!(
1646            stats.audio_frames > 0,
1647            "must have processed at least one audio frame"
1648        );
1649        assert!(
1650            stats.bytes_in > 0,
1651            "must have read at least some bytes from input"
1652        );
1653        assert!(
1654            stats.bytes_out > 0,
1655            "must have written at least some bytes to output"
1656        );
1657    }
1658}