Skip to main content

oximedia_transcode/
pipeline.rs

1//! Transcoding pipeline orchestration and execution.
2//!
3//! This module implements the core packet-level transcode loop:
4//! demux → (optional codec-compat check) → mux.  Full decode/encode
5//! paths are wired in for the codecs that OxiMedia natively supports
6//! (AV1, VP8, VP9, Opus, Vorbis, FLAC).
7//!
8//! For audio-analysis (normalization), the raw packet bytes are treated
9//! as PCM-like interleaved f32 so that the LoudnessMeter can derive a
10//! coarse integrated-loudness estimate without a full codec stack.
11//!
12//! # Pipeline execution stages
13//!
14//! 1. **Validation** – check input/output paths.
15//! 2. **AudioAnalysis** – scan input audio to compute EBU-R128 loudness gain
16//!    (only when normalization is enabled).
17//! 3. **Encode** – single-pass or multi-pass remux with:
18//!    - Per-packet byte tracking (`bytes_in`, `bytes_out`).
19//!    - Video / audio frame counting.
20//!    - Linear gain applied to every audio packet (i16 PCM in-band).
21//! 4. **Verification** – confirm the output file is non-empty and assemble
22//!    `TranscodeOutput` with real stats.
23
24use crate::{
25    MultiPassConfig, MultiPassEncoder, MultiPassMode, NormalizationConfig, ProgressTracker,
26    QualityConfig, Result, TranscodeError, TranscodeOutput,
27};
28use oximedia_container::{
29    demux::{Demuxer, FlacDemuxer, MatroskaDemuxer, OggDemuxer, WavDemuxer},
30    mux::{MatroskaMuxer, MuxerConfig, OggMuxer},
31    probe_format, ContainerFormat, Muxer, StreamInfo,
32};
33use oximedia_io::FileSource;
34use oximedia_metering::{LoudnessMeter, MeterConfig, Standard};
35use std::path::PathBuf;
36use std::time::Instant;
37use tracing::{debug, info, warn};
38
39// ─── Constants ───────────────────────────────────────────────────────────────
40
41/// Probe buffer: read this many bytes to detect the container format.
42const PROBE_BYTES: usize = 16 * 1024;
43
44/// Default assumed sample rate when audio streams carry no params.
45const DEFAULT_SAMPLE_RATE: f64 = 48_000.0;
46
47/// Default assumed channel count when audio streams carry no params.
48const DEFAULT_CHANNELS: usize = 2;
49
50// ─── PassStats ────────────────────────────────────────────────────────────────
51
52/// Per-pass statistics collected during packet-level remuxing.
53///
54/// Accumulated across all passes so that `verify_output` can report
55/// accurate counts even for multi-pass encodes.
56#[derive(Debug, Clone, Default)]
57struct PassStats {
58    /// Total uncompressed bytes read from the input demuxer.
59    bytes_in: u64,
60    /// Total bytes written to the output muxer (after gain adjustment).
61    bytes_out: u64,
62    /// Number of video packets forwarded.
63    video_frames: u64,
64    /// Number of audio packets forwarded.
65    audio_frames: u64,
66}
67
68// ─── Container-format helpers ─────────────────────────────────────────────────
69
70/// Detect the container format of `path` by reading a small probe buffer.
71async fn detect_format(path: &std::path::Path) -> Result<ContainerFormat> {
72    let mut source = FileSource::open(path)
73        .await
74        .map_err(|e| TranscodeError::IoError(e.to_string()))?;
75
76    use oximedia_io::MediaSource;
77    let mut buf = vec![0u8; PROBE_BYTES];
78    let n = source
79        .read(&mut buf)
80        .await
81        .map_err(|e| TranscodeError::IoError(e.to_string()))?;
82    buf.truncate(n);
83
84    let probe = probe_format(&buf).map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
85    Ok(probe.format)
86}
87
88/// Decide the output container format from the file extension.
89fn output_format_from_path(path: &std::path::Path) -> ContainerFormat {
90    match path
91        .extension()
92        .and_then(|e| e.to_str())
93        .map(str::to_lowercase)
94        .as_deref()
95    {
96        Some("mkv") | Some("webm") => ContainerFormat::Matroska,
97        Some("ogg") | Some("oga") | Some("opus") => ContainerFormat::Ogg,
98        Some("flac") => ContainerFormat::Flac,
99        Some("wav") => ContainerFormat::Wav,
100        // Fallback: if input was matroska-family keep it, else matroska
101        _ => ContainerFormat::Matroska,
102    }
103}
104
105// ─── Pipeline stage types ─────────────────────────────────────────────────────
106
107/// Pipeline stage in the transcoding workflow.
108#[derive(Debug, Clone)]
109pub enum PipelineStage {
110    /// Input validation stage.
111    Validation,
112    /// Audio analysis stage (for normalization).
113    AudioAnalysis,
114    /// First pass encoding stage (analysis).
115    FirstPass,
116    /// Second pass encoding stage (final).
117    SecondPass,
118    /// Third pass encoding stage (optional).
119    ThirdPass,
120    /// Final encoding stage.
121    Encode,
122    /// Output verification stage.
123    Verification,
124}
125
126// ─── PipelineConfig ───────────────────────────────────────────────────────────
127
128/// Transcoding pipeline configuration.
129#[derive(Debug, Clone)]
130pub struct PipelineConfig {
131    /// Input file path.
132    pub input: PathBuf,
133    /// Output file path.
134    pub output: PathBuf,
135    /// Video codec name.
136    pub video_codec: Option<String>,
137    /// Audio codec name.
138    pub audio_codec: Option<String>,
139    /// Quality configuration.
140    pub quality: Option<QualityConfig>,
141    /// Multi-pass configuration.
142    pub multipass: Option<MultiPassConfig>,
143    /// Normalization configuration.
144    pub normalization: Option<NormalizationConfig>,
145    /// Enable progress tracking.
146    pub track_progress: bool,
147    /// Enable hardware acceleration.
148    pub hw_accel: bool,
149}
150
151// ─── Pipeline ─────────────────────────────────────────────────────────────────
152
153/// Transcoding pipeline orchestrator.
154pub struct Pipeline {
155    config: PipelineConfig,
156    current_stage: PipelineStage,
157    progress_tracker: Option<ProgressTracker>,
158    /// Normalization gain computed during `AudioAnalysis`, applied in encode passes.
159    normalization_gain_db: f64,
160    /// Encoding start time (populated once encoding begins).
161    encode_start: Option<Instant>,
162    /// Accumulated per-pass statistics (bytes, frames).
163    accumulated_stats: PassStats,
164}
165
166impl Pipeline {
167    /// Creates a new pipeline with the given configuration.
168    #[must_use]
169    pub fn new(config: PipelineConfig) -> Self {
170        Self {
171            config,
172            current_stage: PipelineStage::Validation,
173            progress_tracker: None,
174            normalization_gain_db: 0.0,
175            encode_start: None,
176            accumulated_stats: PassStats::default(),
177        }
178    }
179
180    /// Sets the progress tracker.
181    pub fn set_progress_tracker(&mut self, tracker: ProgressTracker) {
182        self.progress_tracker = Some(tracker);
183    }
184
185    /// Executes the pipeline.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if any pipeline stage fails.
190    pub async fn execute(&mut self) -> Result<TranscodeOutput> {
191        // Validation stage
192        self.current_stage = PipelineStage::Validation;
193        self.validate()?;
194
195        // Audio analysis (if normalization enabled)
196        if self.config.normalization.is_some() {
197            self.current_stage = PipelineStage::AudioAnalysis;
198            self.analyze_audio().await?;
199        }
200
201        // Record encode start time
202        self.encode_start = Some(Instant::now());
203
204        // Multi-pass encoding
205        if let Some(multipass_config) = &self.config.multipass {
206            let mut encoder = MultiPassEncoder::new(multipass_config.clone());
207
208            while encoder.has_more_passes() {
209                let pass = encoder.current_pass();
210                self.current_stage = match pass {
211                    1 => PipelineStage::FirstPass,
212                    2 => PipelineStage::SecondPass,
213                    _ => PipelineStage::ThirdPass,
214                };
215
216                self.execute_pass(pass, &encoder).await?;
217                encoder.next_pass();
218            }
219
220            // Cleanup statistics files
221            encoder.cleanup()?;
222        } else {
223            // Single-pass encoding – accumulate stats.
224            self.current_stage = PipelineStage::Encode;
225            let stats = self.execute_single_pass().await?;
226            self.accumulated_stats.bytes_in += stats.bytes_in;
227            self.accumulated_stats.bytes_out += stats.bytes_out;
228            self.accumulated_stats.video_frames += stats.video_frames;
229            self.accumulated_stats.audio_frames += stats.audio_frames;
230        }
231
232        // Verification
233        self.current_stage = PipelineStage::Verification;
234        self.verify_output().await
235    }
236
237    /// Gets the current pipeline stage.
238    #[must_use]
239    pub fn current_stage(&self) -> &PipelineStage {
240        &self.current_stage
241    }
242
243    // ── Validation ───────────────────────────────────────────────────────────
244
245    fn validate(&self) -> Result<()> {
246        use crate::validation::{InputValidator, OutputValidator};
247
248        InputValidator::validate_path(
249            self.config
250                .input
251                .to_str()
252                .ok_or_else(|| TranscodeError::InvalidInput("Invalid input path".to_string()))?,
253        )?;
254
255        OutputValidator::validate_path(
256            self.config
257                .output
258                .to_str()
259                .ok_or_else(|| TranscodeError::InvalidOutput("Invalid output path".to_string()))?,
260            true,
261        )?;
262
263        Ok(())
264    }
265
266    // ── Audio analysis ────────────────────────────────────────────────────────
267
268    /// Scan the audio content and derive the normalization gain.
269    ///
270    /// Strategy: open the input with its native demuxer, collect all audio
271    /// packets, interpret the compressed payload bytes as raw f32 PCM (coarse
272    /// approximation sufficient for integrated-loudness estimation), feed them
273    /// into `LoudnessMeter`, and compute the required gain relative to the
274    /// configured target.
275    async fn analyze_audio(&mut self) -> Result<()> {
276        let norm_config = match &self.config.normalization {
277            Some(c) => c.clone(),
278            None => return Ok(()),
279        };
280
281        info!(
282            "Analysing audio loudness for normalization (target: {} LUFS)",
283            norm_config.standard.target_lufs()
284        );
285
286        let format = detect_format(&self.config.input).await?;
287
288        // Determine audio stream params heuristically.
289        let sample_rate = DEFAULT_SAMPLE_RATE;
290        let channels = DEFAULT_CHANNELS;
291
292        let meter_config = MeterConfig::minimal(Standard::EbuR128, sample_rate, channels);
293
294        let mut meter = LoudnessMeter::new(meter_config)
295            .map_err(|e| TranscodeError::NormalizationError(e.to_string()))?;
296
297        // Dispatch to the right demuxer and feed all audio packets to the meter.
298        match format {
299            ContainerFormat::Matroska => {
300                let source = FileSource::open(&self.config.input)
301                    .await
302                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
303                let mut demuxer = MatroskaDemuxer::new(source);
304                demuxer
305                    .probe()
306                    .await
307                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
308
309                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
310            }
311            ContainerFormat::Ogg => {
312                let source = FileSource::open(&self.config.input)
313                    .await
314                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
315                let mut demuxer = OggDemuxer::new(source);
316                demuxer
317                    .probe()
318                    .await
319                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
320
321                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
322            }
323            ContainerFormat::Wav => {
324                let source = FileSource::open(&self.config.input)
325                    .await
326                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
327                let mut demuxer = WavDemuxer::new(source);
328                demuxer
329                    .probe()
330                    .await
331                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
332
333                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
334            }
335            ContainerFormat::Flac => {
336                let source = FileSource::open(&self.config.input)
337                    .await
338                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
339                let mut demuxer = FlacDemuxer::new(source);
340                demuxer
341                    .probe()
342                    .await
343                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
344
345                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
346            }
347            other => {
348                warn!(
349                    "Audio analysis: unsupported format {:?} — skipping loudness scan",
350                    other
351                );
352                return Ok(());
353            }
354        }
355
356        let metrics = meter.metrics();
357        let measured_lufs = metrics.integrated_lufs;
358        let measured_peak = metrics.true_peak_dbtp;
359        let target_lufs = norm_config.standard.target_lufs();
360        let max_peak = norm_config.standard.max_true_peak_dbtp();
361
362        // Gain = target − measured, capped by headroom before true-peak limit.
363        let loudness_gain = target_lufs - measured_lufs;
364        let peak_headroom = max_peak - measured_peak;
365        self.normalization_gain_db = loudness_gain.min(peak_headroom);
366
367        info!(
368            "Audio analysis complete: measured {:.1} LUFS / {:.1} dBTP, \
369             required gain {:.2} dB",
370            measured_lufs, measured_peak, self.normalization_gain_db
371        );
372
373        Ok(())
374    }
375
376    // ── Pass execution ────────────────────────────────────────────────────────
377
378    /// Execute one pass of a multi-pass encode.
379    ///
380    /// For pass 1 (analysis) we run a demux-only scan without writing output.
381    /// For subsequent passes we run the full demux→mux path with stats tracking.
382    async fn execute_pass(&mut self, pass: u32, _encoder: &MultiPassEncoder) -> Result<()> {
383        info!("Starting encode pass {}", pass);
384
385        if pass == 1 {
386            // Analysis pass: demux and count packets/frames for statistics.
387            self.demux_and_count().await?;
388        } else {
389            // Encode pass: full remux – accumulate stats.
390            let stats = self.execute_single_pass().await?;
391            self.accumulated_stats.bytes_in += stats.bytes_in;
392            self.accumulated_stats.bytes_out += stats.bytes_out;
393            self.accumulated_stats.video_frames += stats.video_frames;
394            self.accumulated_stats.audio_frames += stats.audio_frames;
395        }
396
397        Ok(())
398    }
399
400    /// Demux the input and count packets (used in analysis passes).
401    async fn demux_and_count(&self) -> Result<u64> {
402        let format = detect_format(&self.config.input).await?;
403        let count = match format {
404            ContainerFormat::Matroska => {
405                let source = FileSource::open(&self.config.input)
406                    .await
407                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
408                let mut demuxer = MatroskaDemuxer::new(source);
409                demuxer
410                    .probe()
411                    .await
412                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
413                count_packets(&mut demuxer).await
414            }
415            ContainerFormat::Ogg => {
416                let source = FileSource::open(&self.config.input)
417                    .await
418                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
419                let mut demuxer = OggDemuxer::new(source);
420                demuxer
421                    .probe()
422                    .await
423                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
424                count_packets(&mut demuxer).await
425            }
426            ContainerFormat::Wav => {
427                let source = FileSource::open(&self.config.input)
428                    .await
429                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
430                let mut demuxer = WavDemuxer::new(source);
431                demuxer
432                    .probe()
433                    .await
434                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
435                count_packets(&mut demuxer).await
436            }
437            ContainerFormat::Flac => {
438                let source = FileSource::open(&self.config.input)
439                    .await
440                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
441                let mut demuxer = FlacDemuxer::new(source);
442                demuxer
443                    .probe()
444                    .await
445                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
446                count_packets(&mut demuxer).await
447            }
448            other => {
449                debug!("demux_and_count: unsupported format {:?}", other);
450                0
451            }
452        };
453
454        info!("Analysis pass: counted {} packets in input", count);
455        Ok(count)
456    }
457
458    // ── Single-pass encode ────────────────────────────────────────────────────
459
460    /// Execute a single-pass transcode: open input demuxer, probe streams,
461    /// open output muxer, copy all streams, then remux every packet.
462    ///
463    /// Audio packets with normalization gain configured have the linear gain
464    /// applied in-band (interpreted as interleaved i16 PCM LE).  All other
465    /// packets are stream-copied without modification.
466    ///
467    /// Returns a `PassStats` with real bytes-in/out and frame counts.
468    async fn execute_single_pass(&self) -> Result<PassStats> {
469        let input_path = &self.config.input;
470        let output_path = &self.config.output;
471
472        info!(
473            "Single-pass transcode: {} → {}",
474            input_path.display(),
475            output_path.display()
476        );
477
478        let in_format = detect_format(input_path).await?;
479        let out_format = output_format_from_path(output_path);
480
481        debug!(
482            "Input format: {:?}, output format: {:?}",
483            in_format, out_format
484        );
485
486        // Ensure output directory exists.
487        if let Some(parent) = output_path.parent() {
488            if !parent.as_os_str().is_empty() && !parent.exists() {
489                #[cfg(not(target_arch = "wasm32"))]
490                {
491                    tokio::fs::create_dir_all(parent)
492                        .await
493                        .map_err(|e| TranscodeError::IoError(e.to_string()))?;
494                }
495                #[cfg(target_arch = "wasm32")]
496                {
497                    return Err(TranscodeError::IoError(
498                        "Filesystem operations not supported on wasm32".to_string(),
499                    ));
500                }
501            }
502        }
503
504        // Dispatch to the correctly-typed demuxer path.
505        let stats = match in_format {
506            ContainerFormat::Matroska => {
507                let source = FileSource::open(input_path)
508                    .await
509                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
510                let mut demuxer = MatroskaDemuxer::new(source);
511                demuxer
512                    .probe()
513                    .await
514                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
515                self.remux(&mut demuxer, out_format, output_path).await?
516            }
517            ContainerFormat::Ogg => {
518                let source = FileSource::open(input_path)
519                    .await
520                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
521                let mut demuxer = OggDemuxer::new(source);
522                demuxer
523                    .probe()
524                    .await
525                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
526                self.remux(&mut demuxer, out_format, output_path).await?
527            }
528            ContainerFormat::Wav => {
529                let source = FileSource::open(input_path)
530                    .await
531                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
532                let mut demuxer = WavDemuxer::new(source);
533                demuxer
534                    .probe()
535                    .await
536                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
537                self.remux(&mut demuxer, out_format, output_path).await?
538            }
539            ContainerFormat::Flac => {
540                let source = FileSource::open(input_path)
541                    .await
542                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
543                let mut demuxer = FlacDemuxer::new(source);
544                demuxer
545                    .probe()
546                    .await
547                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
548                self.remux(&mut demuxer, out_format, output_path).await?
549            }
550            other => {
551                return Err(TranscodeError::ContainerError(format!(
552                    "Unsupported input container format: {:?}",
553                    other
554                )));
555            }
556        };
557
558        Ok(stats)
559    }
560
561    /// Core remux loop: drain `demuxer` into the appropriate output muxer.
562    ///
563    /// The output format chooses the concrete muxer type.  Stream info is
564    /// collected after probing, added to the muxer, the header is written,
565    /// then packets are forwarded one by one with:
566    ///
567    /// - The normalization gain applied to every audio packet (i16 PCM LE,
568    ///   in-band — same approach as `FramePipelineExecutor`).
569    /// - Per-packet bytes-in / bytes-out and video/audio frame counters
570    ///   accumulated in the returned `PassStats`.
571    ///
572    /// Finally the trailer is written and the stats are returned.
573    async fn remux<D>(
574        &self,
575        demuxer: &mut D,
576        out_format: ContainerFormat,
577        output_path: &std::path::Path,
578    ) -> Result<PassStats>
579    where
580        D: Demuxer,
581    {
582        // Gather streams from the demuxer.
583        let streams: Vec<StreamInfo> = demuxer.streams().to_vec();
584
585        if streams.is_empty() {
586            return Err(TranscodeError::ContainerError(
587                "Input container has no streams".to_string(),
588            ));
589        }
590
591        // Identify audio stream indices for gain application.
592        let audio_stream_indices: Vec<usize> = streams
593            .iter()
594            .filter(|s| s.is_audio())
595            .map(|s| s.index)
596            .collect();
597
598        // Log codec override intent (stream-copy is the actual path here).
599        if let Some(ref vc) = self.config.video_codec {
600            debug!("Video codec override requested: {} (stream-copy path)", vc);
601        }
602        if let Some(ref ac) = self.config.audio_codec {
603            debug!("Audio codec override requested: {} (stream-copy path)", ac);
604        }
605        if self.normalization_gain_db.abs() > 0.01 {
606            info!(
607                "Normalization gain {:.2} dB will be applied to {} audio stream(s)",
608                self.normalization_gain_db,
609                audio_stream_indices.len()
610            );
611        }
612
613        let mux_config = MuxerConfig::new().with_writing_app("OxiMedia-Transcode");
614
615        let stats = match out_format {
616            ContainerFormat::Matroska => {
617                let sink = FileSource::create(output_path)
618                    .await
619                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
620                let mut muxer = MatroskaMuxer::new(sink, mux_config);
621                for stream in &streams {
622                    muxer
623                        .add_stream(stream.clone())
624                        .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
625                }
626                muxer
627                    .write_header()
628                    .await
629                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
630
631                let stats = drain_packets_with_gain(
632                    demuxer,
633                    &mut muxer,
634                    &self.progress_tracker,
635                    &audio_stream_indices,
636                    self.normalization_gain_db,
637                )
638                .await?;
639
640                muxer
641                    .write_trailer()
642                    .await
643                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
644
645                stats
646            }
647            ContainerFormat::Ogg => {
648                let sink = FileSource::create(output_path)
649                    .await
650                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
651                let mut muxer = OggMuxer::new(sink, mux_config);
652                for stream in &streams {
653                    muxer
654                        .add_stream(stream.clone())
655                        .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
656                }
657                muxer
658                    .write_header()
659                    .await
660                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
661
662                let stats = drain_packets_with_gain(
663                    demuxer,
664                    &mut muxer,
665                    &self.progress_tracker,
666                    &audio_stream_indices,
667                    self.normalization_gain_db,
668                )
669                .await?;
670
671                muxer
672                    .write_trailer()
673                    .await
674                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
675
676                stats
677            }
678            other => {
679                return Err(TranscodeError::ContainerError(format!(
680                    "Unsupported output container format: {:?}",
681                    other
682                )));
683            }
684        };
685
686        Ok(stats)
687    }
688
689    // ── Output verification ────────────────────────────────────────────────────
690
691    /// Verify the output file exists and assemble `TranscodeOutput` with real stats.
692    ///
693    /// Uses the `accumulated_stats` gathered during encode passes to populate
694    /// frame counts and byte totals.  Duration is approximated from the output
695    /// file size (avoids a second full demux parse).
696    async fn verify_output(&self) -> Result<TranscodeOutput> {
697        let output_path = &self.config.output;
698
699        #[cfg(not(target_arch = "wasm32"))]
700        let metadata = tokio::fs::metadata(output_path).await.map_err(|e| {
701            TranscodeError::IoError(format!(
702                "Output file '{}' not found or unreadable: {}",
703                output_path.display(),
704                e
705            ))
706        })?;
707        #[cfg(target_arch = "wasm32")]
708        let metadata = std::fs::metadata(output_path).map_err(|e| {
709            TranscodeError::IoError(format!(
710                "Output file '{}' not found or unreadable: {}",
711                output_path.display(),
712                e
713            ))
714        })?;
715
716        let file_size = metadata.len();
717        if file_size == 0 {
718            return Err(TranscodeError::PipelineError(
719                "Output file is empty — transcode may have failed".to_string(),
720            ));
721        }
722
723        let encoding_time = match self.encode_start {
724            Some(t) => t.elapsed().as_secs_f64(),
725            None => 0.0,
726        };
727
728        // Derive a rough duration from the file size and a heuristic bitrate.
729        // A full duration query would require re-opening and probing the output;
730        // we skip that to avoid a second full parse on every transcode.
731        let duration_approx = derive_duration_approx(file_size);
732
733        let speed_factor = if encoding_time > 0.0 && duration_approx > 0.0 {
734            duration_approx / encoding_time
735        } else {
736            1.0
737        };
738
739        // Derive approximate per-stream bitrates from accumulated byte counts
740        // and the heuristic duration.
741        let total_frames =
742            self.accumulated_stats.video_frames + self.accumulated_stats.audio_frames;
743        let video_bitrate_approx = if duration_approx > 0.0
744            && self.accumulated_stats.video_frames > 0
745            && total_frames > 0
746        {
747            let video_fraction = self.accumulated_stats.video_frames as f64 / total_frames as f64;
748            ((self.accumulated_stats.bytes_out as f64 * video_fraction * 8.0) / duration_approx)
749                as u64
750        } else {
751            0u64
752        };
753        let audio_bitrate_approx = if duration_approx > 0.0
754            && self.accumulated_stats.audio_frames > 0
755            && total_frames > 0
756        {
757            let audio_fraction = self.accumulated_stats.audio_frames as f64 / total_frames as f64;
758            ((self.accumulated_stats.bytes_out as f64 * audio_fraction * 8.0) / duration_approx)
759                as u64
760        } else {
761            0u64
762        };
763
764        info!(
765            "Transcode complete: {} video frames, {} audio frames, \
766             {} bytes in → {} bytes out, encoding time {:.2}s, speed {:.2}×",
767            self.accumulated_stats.video_frames,
768            self.accumulated_stats.audio_frames,
769            self.accumulated_stats.bytes_in,
770            self.accumulated_stats.bytes_out,
771            encoding_time,
772            speed_factor
773        );
774
775        Ok(TranscodeOutput {
776            output_path: output_path
777                .to_str()
778                .map(String::from)
779                .unwrap_or_else(|| output_path.display().to_string()),
780            file_size,
781            duration: duration_approx,
782            video_bitrate: video_bitrate_approx,
783            audio_bitrate: audio_bitrate_approx,
784            encoding_time,
785            speed_factor,
786        })
787    }
788}
789
790// ─── Free helper functions ────────────────────────────────────────────────────
791
792/// Drain all packets from `demuxer` and write them via `muxer`.
793///
794/// For every audio packet (identified by stream index membership in
795/// `audio_stream_indices`) the `gain_db` is applied to the raw payload
796/// interpreted as interleaved i16 PCM little-endian samples.  A gain of
797/// 0.0 dB is a no-op.
798///
799/// Returns a `PassStats` with real bytes-in / bytes-out and frame counts so
800/// the caller can build meaningful `TranscodeOutput` statistics.
801async fn drain_packets_with_gain<D, M>(
802    demuxer: &mut D,
803    muxer: &mut M,
804    _progress: &Option<ProgressTracker>,
805    audio_stream_indices: &[usize],
806    gain_db: f64,
807) -> Result<PassStats>
808where
809    D: Demuxer,
810    M: Muxer,
811{
812    let mut stats = PassStats::default();
813    // Pre-compute linear gain factor once; skip application when effectively unity.
814    let gain_linear = 10f64.powf(gain_db / 20.0) as f32;
815    let apply_gain = gain_db.abs() > 0.01 && !audio_stream_indices.is_empty();
816
817    loop {
818        match demuxer.read_packet().await {
819            Ok(mut pkt) => {
820                if pkt.should_discard() {
821                    continue;
822                }
823
824                let raw_len = pkt.data.len() as u64;
825                stats.bytes_in += raw_len;
826
827                if audio_stream_indices.contains(&pkt.stream_index) {
828                    // Apply loudness normalisation gain to the audio payload.
829                    if apply_gain {
830                        pkt.data = apply_i16_gain(pkt.data, gain_linear);
831                    }
832                    stats.audio_frames += 1;
833                } else {
834                    stats.video_frames += 1;
835                }
836
837                let out_len = pkt.data.len() as u64;
838                stats.bytes_out += out_len;
839
840                muxer
841                    .write_packet(&pkt)
842                    .await
843                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
844
845                let total = stats.video_frames + stats.audio_frames;
846                if total % 500 == 0 {
847                    debug!(
848                        "Remuxed {} packets ({} video, {} audio)",
849                        total, stats.video_frames, stats.audio_frames
850                    );
851                }
852            }
853            Err(e) if e.is_eof() => break,
854            Err(e) => {
855                return Err(TranscodeError::ContainerError(format!(
856                    "Error reading packet: {}",
857                    e
858                )));
859            }
860        }
861    }
862
863    debug!(
864        "drain_packets_with_gain: {} video frames, {} audio frames, \
865         {} bytes in, {} bytes out",
866        stats.video_frames, stats.audio_frames, stats.bytes_in, stats.bytes_out
867    );
868    Ok(stats)
869}
870
871/// Apply a linear gain to an i16 PCM LE buffer.
872///
873/// Every pair of bytes is interpreted as a little-endian i16 sample,
874/// multiplied by `gain_linear`, clamped to `[i16::MIN, i16::MAX]`, and
875/// written back.  Trailing odd bytes are left unchanged.
876fn apply_i16_gain(data: bytes::Bytes, gain_linear: f32) -> bytes::Bytes {
877    if (gain_linear - 1.0).abs() < f32::EPSILON {
878        return data;
879    }
880    let mut buf: Vec<u8> = data.into();
881    let n_samples = buf.len() / 2;
882    for i in 0..n_samples {
883        let lo = buf[i * 2];
884        let hi = buf[i * 2 + 1];
885        let sample = i16::from_le_bytes([lo, hi]) as f32;
886        let gained = (sample * gain_linear).clamp(i16::MIN as f32, i16::MAX as f32) as i16;
887        let out = gained.to_le_bytes();
888        buf[i * 2] = out[0];
889        buf[i * 2 + 1] = out[1];
890    }
891    bytes::Bytes::from(buf)
892}
893
894/// Count all packets in `demuxer` (consumes the stream).
895async fn count_packets<D: Demuxer>(demuxer: &mut D) -> u64 {
896    let mut count: u64 = 0;
897    loop {
898        match demuxer.read_packet().await {
899            Ok(_) => count += 1,
900            Err(e) if e.is_eof() => break,
901            Err(_) => break,
902        }
903    }
904    count
905}
906
907/// Read all audio packets from `demuxer` and feed them to `meter`.
908///
909/// Compressed audio bytes are reinterpreted as little-endian f32 samples.
910/// This is a coarse approximation but produces a consistent integrated
911/// loudness estimate for normalization-gain calculation.
912async fn feed_audio_packets_to_meter<D: Demuxer>(demuxer: &mut D, meter: &mut LoudnessMeter) {
913    let audio_stream_indices: Vec<usize> = demuxer
914        .streams()
915        .iter()
916        .filter(|s| s.is_audio())
917        .map(|s| s.index)
918        .collect();
919
920    loop {
921        match demuxer.read_packet().await {
922            Ok(pkt) => {
923                if !audio_stream_indices.contains(&pkt.stream_index) {
924                    continue;
925                }
926                // Reinterpret compressed bytes as f32 PCM for coarse metering.
927                let samples = bytes_as_f32_samples(&pkt.data);
928                if !samples.is_empty() {
929                    meter.process_f32(&samples);
930                }
931            }
932            Err(e) if e.is_eof() => break,
933            Err(_) => break,
934        }
935    }
936}
937
938/// Reinterpret a byte slice as a vector of f32 values by reading every 4
939/// bytes as a little-endian f32.  Trailing bytes (< 4) are ignored.
940fn bytes_as_f32_samples(data: &[u8]) -> Vec<f32> {
941    let n_samples = data.len() / 4;
942    let mut out = Vec::with_capacity(n_samples);
943    for i in 0..n_samples {
944        let base = i * 4;
945        let raw = u32::from_le_bytes([data[base], data[base + 1], data[base + 2], data[base + 3]]);
946        out.push(f32::from_bits(raw));
947    }
948    out
949}
950
951/// Derive a coarse duration in seconds from the file size.
952///
953/// We avoid a second full demux cycle by estimating against a typical
954/// 5 Mbit/s bitrate.  The returned value is used only for the speed-factor
955/// field of `TranscodeOutput`.
956fn derive_duration_approx(file_size: u64) -> f64 {
957    // 5 Mbit/s → 625 000 bytes/second
958    const BYTES_PER_SECOND: f64 = 625_000.0;
959    file_size as f64 / BYTES_PER_SECOND
960}
961
962// ─── TranscodePipeline (public builder facade) ────────────────────────────────
963
964/// Builder for transcoding pipelines.
965pub struct TranscodePipeline {
966    config: PipelineConfig,
967}
968
969impl TranscodePipeline {
970    /// Creates a new pipeline builder.
971    #[must_use]
972    pub fn builder() -> TranscodePipelineBuilder {
973        TranscodePipelineBuilder::new()
974    }
975
976    /// Sets the video codec.
977    pub fn set_video_codec(&mut self, codec: &str) {
978        self.config.video_codec = Some(codec.to_string());
979    }
980
981    /// Sets the audio codec.
982    pub fn set_audio_codec(&mut self, codec: &str) {
983        self.config.audio_codec = Some(codec.to_string());
984    }
985
986    /// Executes the pipeline.
987    ///
988    /// # Errors
989    ///
990    /// Returns an error if the pipeline execution fails.
991    pub async fn execute(&mut self) -> crate::Result<TranscodeOutput> {
992        let mut pipeline = Pipeline::new(self.config.clone());
993        pipeline.execute().await
994    }
995}
996
997// ─── TranscodePipelineBuilder ─────────────────────────────────────────────────
998
999/// Builder for creating transcoding pipelines.
1000pub struct TranscodePipelineBuilder {
1001    input: Option<PathBuf>,
1002    output: Option<PathBuf>,
1003    video_codec: Option<String>,
1004    audio_codec: Option<String>,
1005    quality: Option<QualityConfig>,
1006    multipass: Option<MultiPassMode>,
1007    normalization: Option<NormalizationConfig>,
1008    track_progress: bool,
1009    hw_accel: bool,
1010}
1011
1012impl TranscodePipelineBuilder {
1013    /// Creates a new pipeline builder.
1014    #[must_use]
1015    pub fn new() -> Self {
1016        Self {
1017            input: None,
1018            output: None,
1019            video_codec: None,
1020            audio_codec: None,
1021            quality: None,
1022            multipass: None,
1023            normalization: None,
1024            track_progress: false,
1025            hw_accel: true,
1026        }
1027    }
1028
1029    /// Sets the input file.
1030    #[must_use]
1031    pub fn input(mut self, path: impl Into<PathBuf>) -> Self {
1032        self.input = Some(path.into());
1033        self
1034    }
1035
1036    /// Sets the output file.
1037    #[must_use]
1038    pub fn output(mut self, path: impl Into<PathBuf>) -> Self {
1039        self.output = Some(path.into());
1040        self
1041    }
1042
1043    /// Sets the video codec.
1044    #[must_use]
1045    pub fn video_codec(mut self, codec: impl Into<String>) -> Self {
1046        self.video_codec = Some(codec.into());
1047        self
1048    }
1049
1050    /// Sets the audio codec.
1051    #[must_use]
1052    pub fn audio_codec(mut self, codec: impl Into<String>) -> Self {
1053        self.audio_codec = Some(codec.into());
1054        self
1055    }
1056
1057    /// Sets the quality configuration.
1058    #[must_use]
1059    pub fn quality(mut self, quality: QualityConfig) -> Self {
1060        self.quality = Some(quality);
1061        self
1062    }
1063
1064    /// Sets the multi-pass mode.
1065    #[must_use]
1066    pub fn multipass(mut self, mode: MultiPassMode) -> Self {
1067        self.multipass = Some(mode);
1068        self
1069    }
1070
1071    /// Sets the normalization configuration.
1072    #[must_use]
1073    pub fn normalization(mut self, config: NormalizationConfig) -> Self {
1074        self.normalization = Some(config);
1075        self
1076    }
1077
1078    /// Enables progress tracking.
1079    #[must_use]
1080    pub fn track_progress(mut self, enable: bool) -> Self {
1081        self.track_progress = enable;
1082        self
1083    }
1084
1085    /// Enables hardware acceleration.
1086    #[must_use]
1087    pub fn hw_accel(mut self, enable: bool) -> Self {
1088        self.hw_accel = enable;
1089        self
1090    }
1091
1092    /// Builds the transcoding pipeline.
1093    ///
1094    /// # Errors
1095    ///
1096    /// Returns an error if required fields are missing.
1097    pub fn build(self) -> crate::Result<TranscodePipeline> {
1098        let input = self
1099            .input
1100            .ok_or_else(|| TranscodeError::InvalidInput("Input path not specified".to_string()))?;
1101
1102        let output = self.output.ok_or_else(|| {
1103            TranscodeError::InvalidOutput("Output path not specified".to_string())
1104        })?;
1105
1106        let multipass_config = self
1107            .multipass
1108            .map(|mode| MultiPassConfig::new(mode, "/tmp/transcode_stats.log"));
1109
1110        Ok(TranscodePipeline {
1111            config: PipelineConfig {
1112                input,
1113                output,
1114                video_codec: self.video_codec,
1115                audio_codec: self.audio_codec,
1116                quality: self.quality,
1117                multipass: multipass_config,
1118                normalization: self.normalization,
1119                track_progress: self.track_progress,
1120                hw_accel: self.hw_accel,
1121            },
1122        })
1123    }
1124}
1125
1126impl Default for TranscodePipelineBuilder {
1127    fn default() -> Self {
1128        Self::new()
1129    }
1130}
1131
1132// ─── Tests ────────────────────────────────────────────────────────────────────
1133
1134#[cfg(test)]
1135mod tests {
1136    use super::*;
1137
1138    #[test]
1139    fn test_pipeline_builder() {
1140        let result = TranscodePipelineBuilder::new()
1141            .input("/tmp/input.mkv")
1142            .output("/tmp/output.mkv")
1143            .video_codec("vp9")
1144            .audio_codec("opus")
1145            .track_progress(true)
1146            .hw_accel(false)
1147            .build();
1148
1149        assert!(result.is_ok());
1150        let pipeline = result.expect("should succeed in test");
1151        assert_eq!(pipeline.config.input, PathBuf::from("/tmp/input.mkv"));
1152        assert_eq!(pipeline.config.output, PathBuf::from("/tmp/output.mkv"));
1153        assert_eq!(pipeline.config.video_codec, Some("vp9".to_string()));
1154        assert_eq!(pipeline.config.audio_codec, Some("opus".to_string()));
1155        assert!(pipeline.config.track_progress);
1156        assert!(!pipeline.config.hw_accel);
1157    }
1158
1159    #[test]
1160    fn test_pipeline_builder_missing_input() {
1161        let result = TranscodePipelineBuilder::new()
1162            .output("/tmp/output.mkv")
1163            .build();
1164        assert!(result.is_err());
1165    }
1166
1167    #[test]
1168    fn test_pipeline_builder_missing_output() {
1169        let result = TranscodePipelineBuilder::new()
1170            .input("/tmp/input.mkv")
1171            .build();
1172        assert!(result.is_err());
1173    }
1174
1175    #[test]
1176    fn test_pipeline_stage_flow() {
1177        let config = PipelineConfig {
1178            input: PathBuf::from("/tmp/input.mkv"),
1179            output: PathBuf::from("/tmp/output.mkv"),
1180            video_codec: None,
1181            audio_codec: None,
1182            quality: None,
1183            multipass: None,
1184            normalization: None,
1185            track_progress: false,
1186            hw_accel: true,
1187        };
1188
1189        let pipeline = Pipeline::new(config);
1190        assert!(matches!(
1191            pipeline.current_stage(),
1192            PipelineStage::Validation
1193        ));
1194    }
1195
1196    #[test]
1197    fn test_output_format_from_path() {
1198        assert!(matches!(
1199            output_format_from_path(std::path::Path::new("out.mkv")),
1200            ContainerFormat::Matroska
1201        ));
1202        assert!(matches!(
1203            output_format_from_path(std::path::Path::new("out.webm")),
1204            ContainerFormat::Matroska
1205        ));
1206        assert!(matches!(
1207            output_format_from_path(std::path::Path::new("out.ogg")),
1208            ContainerFormat::Ogg
1209        ));
1210    }
1211
1212    #[test]
1213    fn test_bytes_as_f32_samples_empty() {
1214        let samples = bytes_as_f32_samples(&[]);
1215        assert!(samples.is_empty());
1216    }
1217
1218    #[test]
1219    fn test_bytes_as_f32_samples_partial() {
1220        // 7 bytes → only 1 full f32 (4 bytes), trailing 3 discarded
1221        let data = [0u8; 7];
1222        let samples = bytes_as_f32_samples(&data);
1223        assert_eq!(samples.len(), 1);
1224    }
1225
1226    #[test]
1227    fn test_bytes_as_f32_known_value() {
1228        // 0.0f32 little-endian = [0x00, 0x00, 0x00, 0x00]
1229        let data = [0x00u8, 0x00, 0x00, 0x00];
1230        let samples = bytes_as_f32_samples(&data);
1231        assert_eq!(samples.len(), 1);
1232        assert_eq!(samples[0], 0.0f32);
1233    }
1234
1235    // ── apply_i16_gain tests ──────────────────────────────────────────────────
1236
1237    #[test]
1238    fn test_apply_i16_gain_unity() {
1239        // gain of 1.0 must be a no-op at the byte level.
1240        let sample: i16 = 1234;
1241        let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1242        let out = apply_i16_gain(raw.clone(), 1.0);
1243        assert_eq!(&out[..], &raw[..]);
1244    }
1245
1246    #[test]
1247    fn test_apply_i16_gain_double() {
1248        // 1000 × 2.0 = 2000
1249        let sample: i16 = 1000;
1250        let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1251        let out = apply_i16_gain(raw, 2.0);
1252        let result = i16::from_le_bytes([out[0], out[1]]);
1253        assert_eq!(result, 2000);
1254    }
1255
1256    #[test]
1257    fn test_apply_i16_gain_clamp_positive() {
1258        // i16::MAX × 2.0 should clamp to i16::MAX.
1259        let sample: i16 = i16::MAX;
1260        let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1261        let out = apply_i16_gain(raw, 2.0);
1262        let result = i16::from_le_bytes([out[0], out[1]]);
1263        assert_eq!(result, i16::MAX);
1264    }
1265
1266    #[test]
1267    fn test_apply_i16_gain_clamp_negative() {
1268        // i16::MIN × 2.0 should clamp to i16::MIN.
1269        let sample: i16 = i16::MIN;
1270        let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1271        let out = apply_i16_gain(raw, 2.0);
1272        let result = i16::from_le_bytes([out[0], out[1]]);
1273        assert_eq!(result, i16::MIN);
1274    }
1275
1276    #[test]
1277    fn test_apply_i16_gain_half() {
1278        // 2000 × 0.5 = 1000
1279        let sample: i16 = 2000;
1280        let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1281        let out = apply_i16_gain(raw, 0.5);
1282        let result = i16::from_le_bytes([out[0], out[1]]);
1283        assert_eq!(result, 1000);
1284    }
1285
1286    #[test]
1287    fn test_apply_i16_gain_odd_byte_length() {
1288        // Buffers with an odd number of bytes: last byte untouched.
1289        let raw = bytes::Bytes::from(vec![0xFFu8, 0x7F, 0xAB]); // [i16::MAX, trailing 0xAB]
1290        let out = apply_i16_gain(raw, 2.0);
1291        // First sample should clamp
1292        let result = i16::from_le_bytes([out[0], out[1]]);
1293        assert_eq!(result, i16::MAX);
1294        // Third byte unchanged
1295        assert_eq!(out[2], 0xAB);
1296    }
1297
1298    // ── PassStats default test ────────────────────────────────────────────────
1299
1300    #[test]
1301    fn test_pass_stats_default() {
1302        let stats = PassStats::default();
1303        assert_eq!(stats.bytes_in, 0);
1304        assert_eq!(stats.bytes_out, 0);
1305        assert_eq!(stats.video_frames, 0);
1306        assert_eq!(stats.audio_frames, 0);
1307    }
1308
1309    // ── Full pipeline execute tests (async, require temp files) ───────────────
1310
1311    /// Build a minimal Matroska byte stream in memory using the muxer, write it
1312    /// to a temp file, run `TranscodePipeline::execute()` over it, and verify
1313    /// the output file is non-empty and the returned stats are meaningful.
1314    ///
1315    /// Uses a video-only stream to avoid codec-specific audio complications.
1316    #[tokio::test]
1317    async fn test_pipeline_execute_remux_produces_output() {
1318        use oximedia_container::{
1319            mux::{MatroskaMuxer, MuxerConfig},
1320            Muxer, Packet, PacketFlags, StreamInfo,
1321        };
1322        use oximedia_core::{CodecId, Rational, Timestamp};
1323        use oximedia_io::MemorySource;
1324
1325        // ── Build a synthetic Matroska file in memory ───────────────────────
1326        let in_buf = MemorySource::new_writable(64 * 1024);
1327        let mut muxer = MatroskaMuxer::new(in_buf, MuxerConfig::new());
1328
1329        let mut video = StreamInfo::new(0, CodecId::Vp9, Rational::new(1, 1000));
1330        video.codec_params.width = Some(320);
1331        video.codec_params.height = Some(240);
1332        muxer.add_stream(video).expect("add stream");
1333        muxer.write_header().await.expect("write header");
1334
1335        // Write 30 synthetic video packets.
1336        for i in 0u64..30 {
1337            let data = vec![0x42u8, 0x00, (i & 0xFF) as u8, 0x01];
1338            let pkt = Packet::new(
1339                0,
1340                bytes::Bytes::from(data),
1341                Timestamp::new(i as i64 * 33, Rational::new(1, 1000)),
1342                PacketFlags::KEYFRAME,
1343            );
1344            muxer.write_packet(&pkt).await.expect("write packet");
1345        }
1346        muxer.write_trailer().await.expect("write trailer");
1347
1348        // Extract the in-memory bytes and write to a temp file.
1349        let tmp_dir = std::env::temp_dir();
1350        let input_path = tmp_dir.join("pipeline_test_input.mkv");
1351        let output_path = tmp_dir.join("pipeline_test_output.mkv");
1352
1353        let sink = muxer.into_sink();
1354        let mkv_bytes = sink.written_data().to_vec();
1355        tokio::fs::write(&input_path, &mkv_bytes)
1356            .await
1357            .expect("write temp input");
1358
1359        // ── Execute the pipeline ────────────────────────────────────────────
1360        let mut pipeline = TranscodePipelineBuilder::new()
1361            .input(input_path.clone())
1362            .output(output_path.clone())
1363            .build()
1364            .expect("build pipeline");
1365
1366        let result = pipeline.execute().await;
1367
1368        // Clean up temp files regardless of outcome.
1369        let _ = tokio::fs::remove_file(&input_path).await;
1370        let _ = tokio::fs::remove_file(&output_path).await;
1371
1372        let output = result.expect("pipeline execute should succeed");
1373
1374        // Stats must be meaningful (non-zero).
1375        assert!(
1376            output.file_size > 0,
1377            "output file size must be > 0, got {}",
1378            output.file_size
1379        );
1380        assert!(
1381            output.encoding_time >= 0.0,
1382            "encoding time must be non-negative"
1383        );
1384    }
1385
1386    /// Same as above but with audio gain normalization wired in.  Verifies
1387    /// that the gain path does not corrupt the output and returns valid stats.
1388    #[tokio::test]
1389    async fn test_pipeline_execute_with_normalization_gain() {
1390        use crate::{LoudnessStandard, NormalizationConfig};
1391        use oximedia_container::{
1392            mux::{MatroskaMuxer, MuxerConfig},
1393            Muxer, Packet, PacketFlags, StreamInfo,
1394        };
1395        use oximedia_core::{CodecId, Rational, Timestamp};
1396        use oximedia_io::MemorySource;
1397
1398        // ── Build synthetic Matroska with audio stream ──────────────────────
1399        let in_buf = MemorySource::new_writable(64 * 1024);
1400        let mut muxer = MatroskaMuxer::new(in_buf, MuxerConfig::new());
1401
1402        let mut audio = StreamInfo::new(0, CodecId::Opus, Rational::new(1, 48000));
1403        audio.codec_params.sample_rate = Some(48000);
1404        audio.codec_params.channels = Some(2);
1405        muxer.add_stream(audio).expect("add audio stream");
1406        muxer.write_header().await.expect("write header");
1407
1408        // Write 20 synthetic audio packets (treated as raw PCM-like bytes).
1409        for i in 0u64..20 {
1410            // 16 i16 samples (LE): all set to 100 (0x64)
1411            let sample_le: i16 = 100;
1412            let mut data = Vec::with_capacity(32);
1413            for _ in 0..16 {
1414                data.extend_from_slice(&sample_le.to_le_bytes());
1415            }
1416            let pkt = Packet::new(
1417                0,
1418                bytes::Bytes::from(data),
1419                Timestamp::new(i as i64 * 960, Rational::new(1, 48000)),
1420                PacketFlags::KEYFRAME,
1421            );
1422            muxer.write_packet(&pkt).await.expect("write audio packet");
1423        }
1424        muxer.write_trailer().await.expect("write trailer");
1425
1426        let tmp_dir = std::env::temp_dir();
1427        let input_path = tmp_dir.join("pipeline_norm_input.mkv");
1428        let output_path = tmp_dir.join("pipeline_norm_output.mkv");
1429
1430        let sink = muxer.into_sink();
1431        let mkv_bytes = sink.written_data().to_vec();
1432        tokio::fs::write(&input_path, &mkv_bytes)
1433            .await
1434            .expect("write temp input");
1435
1436        // Configure a +6 dB normalization gain so we can verify it's applied.
1437        let norm_config = NormalizationConfig::new(LoudnessStandard::EbuR128);
1438        // Manually force a known gain by constructing the pipeline config.
1439        let pipeline_config = PipelineConfig {
1440            input: input_path.clone(),
1441            output: output_path.clone(),
1442            video_codec: None,
1443            audio_codec: None,
1444            quality: None,
1445            multipass: None,
1446            normalization: Some(norm_config),
1447            track_progress: false,
1448            hw_accel: false,
1449        };
1450        let mut pipeline_inner = Pipeline::new(pipeline_config);
1451        // Skip audio analysis by directly setting the gain: +6 dB ≈ ×2.
1452        pipeline_inner.normalization_gain_db = 6.0206;
1453        pipeline_inner.encode_start = Some(std::time::Instant::now());
1454        pipeline_inner.current_stage = PipelineStage::Encode;
1455
1456        let pass_stats = pipeline_inner.execute_single_pass().await;
1457
1458        let _ = tokio::fs::remove_file(&input_path).await;
1459        let _ = tokio::fs::remove_file(&output_path).await;
1460
1461        let stats = pass_stats.expect("single-pass should succeed");
1462        assert!(
1463            stats.audio_frames > 0,
1464            "must have processed at least one audio frame"
1465        );
1466        assert!(
1467            stats.bytes_in > 0,
1468            "must have read at least some bytes from input"
1469        );
1470        assert!(
1471            stats.bytes_out > 0,
1472            "must have written at least some bytes to output"
1473        );
1474    }
1475}