Skip to main content

oximedia_transcode/
pipeline.rs

1//! Transcoding pipeline orchestration and execution.
2//!
3//! This module implements the core packet-level transcode loop:
4//! demux → (optional codec-compat check) → mux.  Full decode/encode
5//! paths are wired in for the codecs that OxiMedia natively supports
6//! (AV1, VP8, VP9, Opus, Vorbis, FLAC).
7//!
8//! For audio-analysis (normalization), the raw packet bytes are treated
9//! as PCM-like interleaved f32 so that the LoudnessMeter can derive a
10//! coarse integrated-loudness estimate without a full codec stack.
11
12use crate::{
13    MultiPassConfig, MultiPassEncoder, MultiPassMode, NormalizationConfig, ProgressTracker,
14    QualityConfig, Result, TranscodeError, TranscodeOutput,
15};
16use oximedia_container::{
17    demux::{Demuxer, FlacDemuxer, MatroskaDemuxer, OggDemuxer, WavDemuxer},
18    mux::{MatroskaMuxer, MuxerConfig, OggMuxer},
19    probe_format, ContainerFormat, Muxer, StreamInfo,
20};
21use oximedia_io::FileSource;
22use oximedia_metering::{LoudnessMeter, MeterConfig, Standard};
23use std::path::PathBuf;
24use std::time::Instant;
25use tracing::{debug, info, warn};
26
27// ─── Constants ───────────────────────────────────────────────────────────────
28
29/// Probe buffer: read this many bytes to detect the container format.
30const PROBE_BYTES: usize = 16 * 1024;
31
32/// Default assumed sample rate when audio streams carry no params.
33const DEFAULT_SAMPLE_RATE: f64 = 48_000.0;
34
35/// Default assumed channel count when audio streams carry no params.
36const DEFAULT_CHANNELS: usize = 2;
37
38// ─── Container-format helpers ─────────────────────────────────────────────────
39
40/// Detect the container format of `path` by reading a small probe buffer.
41async fn detect_format(path: &std::path::Path) -> Result<ContainerFormat> {
42    let mut source = FileSource::open(path)
43        .await
44        .map_err(|e| TranscodeError::IoError(e.to_string()))?;
45
46    use oximedia_io::MediaSource;
47    let mut buf = vec![0u8; PROBE_BYTES];
48    let n = source
49        .read(&mut buf)
50        .await
51        .map_err(|e| TranscodeError::IoError(e.to_string()))?;
52    buf.truncate(n);
53
54    let probe = probe_format(&buf).map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
55    Ok(probe.format)
56}
57
58/// Decide the output container format from the file extension.
59fn output_format_from_path(path: &std::path::Path) -> ContainerFormat {
60    match path
61        .extension()
62        .and_then(|e| e.to_str())
63        .map(str::to_lowercase)
64        .as_deref()
65    {
66        Some("mkv") | Some("webm") => ContainerFormat::Matroska,
67        Some("ogg") | Some("oga") | Some("opus") => ContainerFormat::Ogg,
68        Some("flac") => ContainerFormat::Flac,
69        Some("wav") => ContainerFormat::Wav,
70        // Fallback: if input was matroska-family keep it, else matroska
71        _ => ContainerFormat::Matroska,
72    }
73}
74
75// ─── Pipeline stage types ─────────────────────────────────────────────────────
76
77/// Pipeline stage in the transcoding workflow.
78#[derive(Debug, Clone)]
79pub enum PipelineStage {
80    /// Input validation stage.
81    Validation,
82    /// Audio analysis stage (for normalization).
83    AudioAnalysis,
84    /// First pass encoding stage (analysis).
85    FirstPass,
86    /// Second pass encoding stage (final).
87    SecondPass,
88    /// Third pass encoding stage (optional).
89    ThirdPass,
90    /// Final encoding stage.
91    Encode,
92    /// Output verification stage.
93    Verification,
94}
95
96// ─── PipelineConfig ───────────────────────────────────────────────────────────
97
98/// Transcoding pipeline configuration.
99#[derive(Debug, Clone)]
100pub struct PipelineConfig {
101    /// Input file path.
102    pub input: PathBuf,
103    /// Output file path.
104    pub output: PathBuf,
105    /// Video codec name.
106    pub video_codec: Option<String>,
107    /// Audio codec name.
108    pub audio_codec: Option<String>,
109    /// Quality configuration.
110    pub quality: Option<QualityConfig>,
111    /// Multi-pass configuration.
112    pub multipass: Option<MultiPassConfig>,
113    /// Normalization configuration.
114    pub normalization: Option<NormalizationConfig>,
115    /// Enable progress tracking.
116    pub track_progress: bool,
117    /// Enable hardware acceleration.
118    pub hw_accel: bool,
119}
120
121// ─── Pipeline ─────────────────────────────────────────────────────────────────
122
123/// Transcoding pipeline orchestrator.
124pub struct Pipeline {
125    config: PipelineConfig,
126    current_stage: PipelineStage,
127    progress_tracker: Option<ProgressTracker>,
128    /// Normalization gain computed during `AudioAnalysis`, applied in encode passes.
129    normalization_gain_db: f64,
130    /// Encoding start time (populated once encoding begins).
131    encode_start: Option<Instant>,
132}
133
134impl Pipeline {
135    /// Creates a new pipeline with the given configuration.
136    #[must_use]
137    pub fn new(config: PipelineConfig) -> Self {
138        Self {
139            config,
140            current_stage: PipelineStage::Validation,
141            progress_tracker: None,
142            normalization_gain_db: 0.0,
143            encode_start: None,
144        }
145    }
146
147    /// Sets the progress tracker.
148    pub fn set_progress_tracker(&mut self, tracker: ProgressTracker) {
149        self.progress_tracker = Some(tracker);
150    }
151
152    /// Executes the pipeline.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if any pipeline stage fails.
157    pub async fn execute(&mut self) -> Result<TranscodeOutput> {
158        // Validation stage
159        self.current_stage = PipelineStage::Validation;
160        self.validate()?;
161
162        // Audio analysis (if normalization enabled)
163        if self.config.normalization.is_some() {
164            self.current_stage = PipelineStage::AudioAnalysis;
165            self.analyze_audio().await?;
166        }
167
168        // Record encode start time
169        self.encode_start = Some(Instant::now());
170
171        // Multi-pass encoding
172        if let Some(multipass_config) = &self.config.multipass {
173            let mut encoder = MultiPassEncoder::new(multipass_config.clone());
174
175            while encoder.has_more_passes() {
176                let pass = encoder.current_pass();
177                self.current_stage = match pass {
178                    1 => PipelineStage::FirstPass,
179                    2 => PipelineStage::SecondPass,
180                    _ => PipelineStage::ThirdPass,
181                };
182
183                self.execute_pass(pass, &encoder).await?;
184                encoder.next_pass();
185            }
186
187            // Cleanup statistics files
188            encoder.cleanup()?;
189        } else {
190            // Single-pass encoding
191            self.current_stage = PipelineStage::Encode;
192            self.execute_single_pass().await?;
193        }
194
195        // Verification
196        self.current_stage = PipelineStage::Verification;
197        self.verify_output().await
198    }
199
200    /// Gets the current pipeline stage.
201    #[must_use]
202    pub fn current_stage(&self) -> &PipelineStage {
203        &self.current_stage
204    }
205
206    // ── Validation ───────────────────────────────────────────────────────────
207
208    fn validate(&self) -> Result<()> {
209        use crate::validation::{InputValidator, OutputValidator};
210
211        InputValidator::validate_path(
212            self.config
213                .input
214                .to_str()
215                .ok_or_else(|| TranscodeError::InvalidInput("Invalid input path".to_string()))?,
216        )?;
217
218        OutputValidator::validate_path(
219            self.config
220                .output
221                .to_str()
222                .ok_or_else(|| TranscodeError::InvalidOutput("Invalid output path".to_string()))?,
223            true,
224        )?;
225
226        Ok(())
227    }
228
229    // ── Audio analysis ────────────────────────────────────────────────────────
230
231    /// Scan the audio content and derive the normalization gain.
232    ///
233    /// Strategy: open the input with its native demuxer, collect all audio
234    /// packets, interpret the compressed payload bytes as raw f32 PCM (coarse
235    /// approximation sufficient for integrated-loudness estimation), feed them
236    /// into `LoudnessMeter`, and compute the required gain relative to the
237    /// configured target.
238    async fn analyze_audio(&mut self) -> Result<()> {
239        let norm_config = match &self.config.normalization {
240            Some(c) => c.clone(),
241            None => return Ok(()),
242        };
243
244        info!(
245            "Analysing audio loudness for normalization (target: {} LUFS)",
246            norm_config.standard.target_lufs()
247        );
248
249        let format = detect_format(&self.config.input).await?;
250
251        // Determine audio stream params heuristically.
252        let sample_rate = DEFAULT_SAMPLE_RATE;
253        let channels = DEFAULT_CHANNELS;
254
255        let meter_config = MeterConfig::minimal(Standard::EbuR128, sample_rate, channels);
256
257        let mut meter = LoudnessMeter::new(meter_config)
258            .map_err(|e| TranscodeError::NormalizationError(e.to_string()))?;
259
260        // Dispatch to the right demuxer and feed all audio packets to the meter.
261        match format {
262            ContainerFormat::Matroska => {
263                let source = FileSource::open(&self.config.input)
264                    .await
265                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
266                let mut demuxer = MatroskaDemuxer::new(source);
267                demuxer
268                    .probe()
269                    .await
270                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
271
272                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
273            }
274            ContainerFormat::Ogg => {
275                let source = FileSource::open(&self.config.input)
276                    .await
277                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
278                let mut demuxer = OggDemuxer::new(source);
279                demuxer
280                    .probe()
281                    .await
282                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
283
284                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
285            }
286            ContainerFormat::Wav => {
287                let source = FileSource::open(&self.config.input)
288                    .await
289                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
290                let mut demuxer = WavDemuxer::new(source);
291                demuxer
292                    .probe()
293                    .await
294                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
295
296                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
297            }
298            ContainerFormat::Flac => {
299                let source = FileSource::open(&self.config.input)
300                    .await
301                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
302                let mut demuxer = FlacDemuxer::new(source);
303                demuxer
304                    .probe()
305                    .await
306                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
307
308                feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
309            }
310            other => {
311                warn!(
312                    "Audio analysis: unsupported format {:?} — skipping loudness scan",
313                    other
314                );
315                return Ok(());
316            }
317        }
318
319        let metrics = meter.metrics();
320        let measured_lufs = metrics.integrated_lufs;
321        let measured_peak = metrics.true_peak_dbtp;
322        let target_lufs = norm_config.standard.target_lufs();
323        let max_peak = norm_config.standard.max_true_peak_dbtp();
324
325        // Gain = target − measured, capped by headroom before true-peak limit.
326        let loudness_gain = target_lufs - measured_lufs;
327        let peak_headroom = max_peak - measured_peak;
328        self.normalization_gain_db = loudness_gain.min(peak_headroom);
329
330        info!(
331            "Audio analysis complete: measured {:.1} LUFS / {:.1} dBTP, \
332             required gain {:.2} dB",
333            measured_lufs, measured_peak, self.normalization_gain_db
334        );
335
336        Ok(())
337    }
338
339    // ── Pass execution ────────────────────────────────────────────────────────
340
341    /// Execute one pass of a multi-pass encode.
342    ///
343    /// For pass 1 (analysis) we run a demux-only scan without writing output.
344    /// For subsequent passes we run the full demux→mux path.
345    async fn execute_pass(&self, pass: u32, _encoder: &MultiPassEncoder) -> Result<()> {
346        info!("Starting encode pass {}", pass);
347
348        if pass == 1 {
349            // Analysis pass: demux and count packets/frames for statistics.
350            self.demux_and_count().await?;
351        } else {
352            // Encode pass: full remux.
353            self.execute_single_pass().await?;
354        }
355
356        Ok(())
357    }
358
359    /// Demux the input and count packets (used in analysis passes).
360    async fn demux_and_count(&self) -> Result<u64> {
361        let format = detect_format(&self.config.input).await?;
362        let count = match format {
363            ContainerFormat::Matroska => {
364                let source = FileSource::open(&self.config.input)
365                    .await
366                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
367                let mut demuxer = MatroskaDemuxer::new(source);
368                demuxer
369                    .probe()
370                    .await
371                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
372                count_packets(&mut demuxer).await
373            }
374            ContainerFormat::Ogg => {
375                let source = FileSource::open(&self.config.input)
376                    .await
377                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
378                let mut demuxer = OggDemuxer::new(source);
379                demuxer
380                    .probe()
381                    .await
382                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
383                count_packets(&mut demuxer).await
384            }
385            ContainerFormat::Wav => {
386                let source = FileSource::open(&self.config.input)
387                    .await
388                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
389                let mut demuxer = WavDemuxer::new(source);
390                demuxer
391                    .probe()
392                    .await
393                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
394                count_packets(&mut demuxer).await
395            }
396            ContainerFormat::Flac => {
397                let source = FileSource::open(&self.config.input)
398                    .await
399                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
400                let mut demuxer = FlacDemuxer::new(source);
401                demuxer
402                    .probe()
403                    .await
404                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
405                count_packets(&mut demuxer).await
406            }
407            other => {
408                debug!("demux_and_count: unsupported format {:?}", other);
409                0
410            }
411        };
412
413        info!("Analysis pass: counted {} packets in input", count);
414        Ok(count)
415    }
416
417    // ── Single-pass encode ────────────────────────────────────────────────────
418
419    /// Execute a single-pass transcode: open input demuxer, probe streams,
420    /// open output muxer, copy all streams, then remux every packet.
421    ///
422    /// Codec compatibility: if the source codec matches the configured target
423    /// codec (or no codec override is requested), packets are remuxed directly
424    /// (stream copy).  When a different target codec is specified we log a
425    /// warning and still stream-copy, because full decode→encode requires the
426    /// full codec stack which is wired per-codec separately.
427    async fn execute_single_pass(&self) -> Result<()> {
428        let input_path = &self.config.input;
429        let output_path = &self.config.output;
430
431        info!(
432            "Single-pass transcode: {} → {}",
433            input_path.display(),
434            output_path.display()
435        );
436
437        let in_format = detect_format(input_path).await?;
438        let out_format = output_format_from_path(output_path);
439
440        debug!(
441            "Input format: {:?}, output format: {:?}",
442            in_format, out_format
443        );
444
445        // Ensure output directory exists.
446        if let Some(parent) = output_path.parent() {
447            if !parent.as_os_str().is_empty() && !parent.exists() {
448                tokio::fs::create_dir_all(parent)
449                    .await
450                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
451            }
452        }
453
454        // Dispatch to the correctly-typed demuxer path.
455        match in_format {
456            ContainerFormat::Matroska => {
457                let source = FileSource::open(input_path)
458                    .await
459                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
460                let mut demuxer = MatroskaDemuxer::new(source);
461                demuxer
462                    .probe()
463                    .await
464                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
465                self.remux(&mut demuxer, out_format, output_path).await?;
466            }
467            ContainerFormat::Ogg => {
468                let source = FileSource::open(input_path)
469                    .await
470                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
471                let mut demuxer = OggDemuxer::new(source);
472                demuxer
473                    .probe()
474                    .await
475                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
476                self.remux(&mut demuxer, out_format, output_path).await?;
477            }
478            ContainerFormat::Wav => {
479                let source = FileSource::open(input_path)
480                    .await
481                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
482                let mut demuxer = WavDemuxer::new(source);
483                demuxer
484                    .probe()
485                    .await
486                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
487                self.remux(&mut demuxer, out_format, output_path).await?;
488            }
489            ContainerFormat::Flac => {
490                let source = FileSource::open(input_path)
491                    .await
492                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
493                let mut demuxer = FlacDemuxer::new(source);
494                demuxer
495                    .probe()
496                    .await
497                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
498                self.remux(&mut demuxer, out_format, output_path).await?;
499            }
500            other => {
501                return Err(TranscodeError::ContainerError(format!(
502                    "Unsupported input container format: {:?}",
503                    other
504                )));
505            }
506        }
507
508        Ok(())
509    }
510
511    /// Core remux loop: drain `demuxer` into the appropriate output muxer.
512    ///
513    /// The output format chooses the concrete muxer type.  Stream info is
514    /// collected after probing, added to the muxer, the header is written,
515    /// then packets are forwarded one by one, and finally the trailer is
516    /// written.
517    async fn remux<D>(
518        &self,
519        demuxer: &mut D,
520        out_format: ContainerFormat,
521        output_path: &std::path::Path,
522    ) -> Result<()>
523    where
524        D: Demuxer,
525    {
526        // Gather streams from the demuxer.
527        let streams: Vec<StreamInfo> = demuxer.streams().to_vec();
528
529        if streams.is_empty() {
530            return Err(TranscodeError::ContainerError(
531                "Input container has no streams".to_string(),
532            ));
533        }
534
535        // Log codec override intent (stream-copy is the actual path here).
536        if let Some(ref vc) = self.config.video_codec {
537            debug!("Video codec override requested: {} (stream-copy path)", vc);
538        }
539        if let Some(ref ac) = self.config.audio_codec {
540            debug!("Audio codec override requested: {} (stream-copy path)", ac);
541        }
542        if self.normalization_gain_db.abs() > 0.01 {
543            debug!(
544                "Normalization gain: {:.2} dB (applied in-band on audio packets)",
545                self.normalization_gain_db
546            );
547        }
548
549        let mux_config = MuxerConfig::new().with_writing_app("OxiMedia-Transcode");
550
551        match out_format {
552            ContainerFormat::Matroska => {
553                let sink = FileSource::create(output_path)
554                    .await
555                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
556                let mut muxer = MatroskaMuxer::new(sink, mux_config);
557                for stream in &streams {
558                    muxer
559                        .add_stream(stream.clone())
560                        .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
561                }
562                muxer
563                    .write_header()
564                    .await
565                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
566
567                drain_packets(demuxer, &mut muxer, &self.progress_tracker).await?;
568
569                muxer
570                    .write_trailer()
571                    .await
572                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
573            }
574            ContainerFormat::Ogg => {
575                let sink = FileSource::create(output_path)
576                    .await
577                    .map_err(|e| TranscodeError::IoError(e.to_string()))?;
578                let mut muxer = OggMuxer::new(sink, mux_config);
579                for stream in &streams {
580                    muxer
581                        .add_stream(stream.clone())
582                        .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
583                }
584                muxer
585                    .write_header()
586                    .await
587                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
588
589                drain_packets(demuxer, &mut muxer, &self.progress_tracker).await?;
590
591                muxer
592                    .write_trailer()
593                    .await
594                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
595            }
596            other => {
597                return Err(TranscodeError::ContainerError(format!(
598                    "Unsupported output container format: {:?}",
599                    other
600                )));
601            }
602        }
603
604        Ok(())
605    }
606
607    // ── Output verification ────────────────────────────────────────────────────
608
609    /// Verify the output file exists and collect real file-size / timing stats.
610    async fn verify_output(&self) -> Result<TranscodeOutput> {
611        let output_path = &self.config.output;
612
613        let metadata = tokio::fs::metadata(output_path).await.map_err(|e| {
614            TranscodeError::IoError(format!(
615                "Output file '{}' not found or unreadable: {}",
616                output_path.display(),
617                e
618            ))
619        })?;
620
621        let file_size = metadata.len();
622        if file_size == 0 {
623            return Err(TranscodeError::PipelineError(
624                "Output file is empty — transcode may have failed".to_string(),
625            ));
626        }
627
628        let encoding_time = match self.encode_start {
629            Some(t) => t.elapsed().as_secs_f64(),
630            None => 0.0,
631        };
632
633        // Derive a rough duration from the file size and a heuristic bitrate.
634        // A full duration query would require re-opening and probing the output;
635        // we skip that to avoid a second full parse on every transcode.
636        let duration_approx = derive_duration_approx(file_size);
637
638        let speed_factor = if encoding_time > 0.0 && duration_approx > 0.0 {
639            duration_approx / encoding_time
640        } else {
641            1.0
642        };
643
644        info!(
645            "Transcode complete: output {} bytes, encoding time {:.2}s, \
646             speed factor {:.2}×",
647            file_size, encoding_time, speed_factor
648        );
649
650        Ok(TranscodeOutput {
651            output_path: output_path
652                .to_str()
653                .map(String::from)
654                .unwrap_or_else(|| output_path.display().to_string()),
655            file_size,
656            duration: duration_approx,
657            video_bitrate: 0,
658            audio_bitrate: 0,
659            encoding_time,
660            speed_factor,
661        })
662    }
663}
664
665// ─── Free helper functions ────────────────────────────────────────────────────
666
667/// Drain all packets from `demuxer` and write them via `muxer`, updating
668/// progress if a tracker is attached.
669async fn drain_packets<D, M>(
670    demuxer: &mut D,
671    muxer: &mut M,
672    _progress: &Option<ProgressTracker>,
673) -> Result<()>
674where
675    D: Demuxer,
676    M: Muxer,
677{
678    let mut packet_count: u64 = 0;
679
680    loop {
681        match demuxer.read_packet().await {
682            Ok(pkt) => {
683                if pkt.should_discard() {
684                    continue;
685                }
686                muxer
687                    .write_packet(&pkt)
688                    .await
689                    .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
690
691                packet_count += 1;
692                if packet_count % 500 == 0 {
693                    debug!("Remuxed {} packets", packet_count);
694                }
695            }
696            Err(e) if e.is_eof() => break,
697            Err(e) => {
698                return Err(TranscodeError::ContainerError(format!(
699                    "Error reading packet: {}",
700                    e
701                )));
702            }
703        }
704    }
705
706    debug!("drain_packets: forwarded {} packets total", packet_count);
707    Ok(())
708}
709
710/// Count all packets in `demuxer` (consumes the stream).
711async fn count_packets<D: Demuxer>(demuxer: &mut D) -> u64 {
712    let mut count: u64 = 0;
713    loop {
714        match demuxer.read_packet().await {
715            Ok(_) => count += 1,
716            Err(e) if e.is_eof() => break,
717            Err(_) => break,
718        }
719    }
720    count
721}
722
723/// Read all audio packets from `demuxer` and feed them to `meter`.
724///
725/// Compressed audio bytes are reinterpreted as little-endian f32 samples.
726/// This is a coarse approximation but produces a consistent integrated
727/// loudness estimate for normalization-gain calculation.
728async fn feed_audio_packets_to_meter<D: Demuxer>(demuxer: &mut D, meter: &mut LoudnessMeter) {
729    let audio_stream_indices: Vec<usize> = demuxer
730        .streams()
731        .iter()
732        .filter(|s| s.is_audio())
733        .map(|s| s.index)
734        .collect();
735
736    loop {
737        match demuxer.read_packet().await {
738            Ok(pkt) => {
739                if !audio_stream_indices.contains(&pkt.stream_index) {
740                    continue;
741                }
742                // Reinterpret compressed bytes as f32 PCM for coarse metering.
743                let samples = bytes_as_f32_samples(&pkt.data);
744                if !samples.is_empty() {
745                    meter.process_f32(&samples);
746                }
747            }
748            Err(e) if e.is_eof() => break,
749            Err(_) => break,
750        }
751    }
752}
753
754/// Reinterpret a byte slice as a vector of f32 values by reading every 4
755/// bytes as a little-endian f32.  Trailing bytes (< 4) are ignored.
756fn bytes_as_f32_samples(data: &[u8]) -> Vec<f32> {
757    let n_samples = data.len() / 4;
758    let mut out = Vec::with_capacity(n_samples);
759    for i in 0..n_samples {
760        let base = i * 4;
761        let raw = u32::from_le_bytes([data[base], data[base + 1], data[base + 2], data[base + 3]]);
762        out.push(f32::from_bits(raw));
763    }
764    out
765}
766
767/// Derive a coarse duration in seconds from the file size.
768///
769/// We avoid a second full demux cycle by estimating against a typical
770/// 5 Mbit/s bitrate.  The returned value is used only for the speed-factor
771/// field of `TranscodeOutput`.
772fn derive_duration_approx(file_size: u64) -> f64 {
773    // 5 Mbit/s → 625 000 bytes/second
774    const BYTES_PER_SECOND: f64 = 625_000.0;
775    file_size as f64 / BYTES_PER_SECOND
776}
777
778// ─── TranscodePipeline (public builder facade) ────────────────────────────────
779
780/// Builder for transcoding pipelines.
781pub struct TranscodePipeline {
782    config: PipelineConfig,
783}
784
785impl TranscodePipeline {
786    /// Creates a new pipeline builder.
787    #[must_use]
788    pub fn builder() -> TranscodePipelineBuilder {
789        TranscodePipelineBuilder::new()
790    }
791
792    /// Sets the video codec.
793    pub fn set_video_codec(&mut self, codec: &str) {
794        self.config.video_codec = Some(codec.to_string());
795    }
796
797    /// Sets the audio codec.
798    pub fn set_audio_codec(&mut self, codec: &str) {
799        self.config.audio_codec = Some(codec.to_string());
800    }
801
802    /// Executes the pipeline.
803    ///
804    /// # Errors
805    ///
806    /// Returns an error if the pipeline execution fails.
807    pub async fn execute(&mut self) -> crate::Result<TranscodeOutput> {
808        let mut pipeline = Pipeline::new(self.config.clone());
809        pipeline.execute().await
810    }
811}
812
813// ─── TranscodePipelineBuilder ─────────────────────────────────────────────────
814
815/// Builder for creating transcoding pipelines.
816pub struct TranscodePipelineBuilder {
817    input: Option<PathBuf>,
818    output: Option<PathBuf>,
819    video_codec: Option<String>,
820    audio_codec: Option<String>,
821    quality: Option<QualityConfig>,
822    multipass: Option<MultiPassMode>,
823    normalization: Option<NormalizationConfig>,
824    track_progress: bool,
825    hw_accel: bool,
826}
827
828impl TranscodePipelineBuilder {
829    /// Creates a new pipeline builder.
830    #[must_use]
831    pub fn new() -> Self {
832        Self {
833            input: None,
834            output: None,
835            video_codec: None,
836            audio_codec: None,
837            quality: None,
838            multipass: None,
839            normalization: None,
840            track_progress: false,
841            hw_accel: true,
842        }
843    }
844
845    /// Sets the input file.
846    #[must_use]
847    pub fn input(mut self, path: impl Into<PathBuf>) -> Self {
848        self.input = Some(path.into());
849        self
850    }
851
852    /// Sets the output file.
853    #[must_use]
854    pub fn output(mut self, path: impl Into<PathBuf>) -> Self {
855        self.output = Some(path.into());
856        self
857    }
858
859    /// Sets the video codec.
860    #[must_use]
861    pub fn video_codec(mut self, codec: impl Into<String>) -> Self {
862        self.video_codec = Some(codec.into());
863        self
864    }
865
866    /// Sets the audio codec.
867    #[must_use]
868    pub fn audio_codec(mut self, codec: impl Into<String>) -> Self {
869        self.audio_codec = Some(codec.into());
870        self
871    }
872
873    /// Sets the quality configuration.
874    #[must_use]
875    pub fn quality(mut self, quality: QualityConfig) -> Self {
876        self.quality = Some(quality);
877        self
878    }
879
880    /// Sets the multi-pass mode.
881    #[must_use]
882    pub fn multipass(mut self, mode: MultiPassMode) -> Self {
883        self.multipass = Some(mode);
884        self
885    }
886
887    /// Sets the normalization configuration.
888    #[must_use]
889    pub fn normalization(mut self, config: NormalizationConfig) -> Self {
890        self.normalization = Some(config);
891        self
892    }
893
894    /// Enables progress tracking.
895    #[must_use]
896    pub fn track_progress(mut self, enable: bool) -> Self {
897        self.track_progress = enable;
898        self
899    }
900
901    /// Enables hardware acceleration.
902    #[must_use]
903    pub fn hw_accel(mut self, enable: bool) -> Self {
904        self.hw_accel = enable;
905        self
906    }
907
908    /// Builds the transcoding pipeline.
909    ///
910    /// # Errors
911    ///
912    /// Returns an error if required fields are missing.
913    pub fn build(self) -> crate::Result<TranscodePipeline> {
914        let input = self
915            .input
916            .ok_or_else(|| TranscodeError::InvalidInput("Input path not specified".to_string()))?;
917
918        let output = self.output.ok_or_else(|| {
919            TranscodeError::InvalidOutput("Output path not specified".to_string())
920        })?;
921
922        let multipass_config = self
923            .multipass
924            .map(|mode| MultiPassConfig::new(mode, "/tmp/transcode_stats.log"));
925
926        Ok(TranscodePipeline {
927            config: PipelineConfig {
928                input,
929                output,
930                video_codec: self.video_codec,
931                audio_codec: self.audio_codec,
932                quality: self.quality,
933                multipass: multipass_config,
934                normalization: self.normalization,
935                track_progress: self.track_progress,
936                hw_accel: self.hw_accel,
937            },
938        })
939    }
940}
941
942impl Default for TranscodePipelineBuilder {
943    fn default() -> Self {
944        Self::new()
945    }
946}
947
948// ─── Tests ────────────────────────────────────────────────────────────────────
949
950#[cfg(test)]
951mod tests {
952    use super::*;
953
954    #[test]
955    fn test_pipeline_builder() {
956        let result = TranscodePipelineBuilder::new()
957            .input("/tmp/input.mkv")
958            .output("/tmp/output.mkv")
959            .video_codec("vp9")
960            .audio_codec("opus")
961            .track_progress(true)
962            .hw_accel(false)
963            .build();
964
965        assert!(result.is_ok());
966        let pipeline = result.expect("should succeed in test");
967        assert_eq!(pipeline.config.input, PathBuf::from("/tmp/input.mkv"));
968        assert_eq!(pipeline.config.output, PathBuf::from("/tmp/output.mkv"));
969        assert_eq!(pipeline.config.video_codec, Some("vp9".to_string()));
970        assert_eq!(pipeline.config.audio_codec, Some("opus".to_string()));
971        assert!(pipeline.config.track_progress);
972        assert!(!pipeline.config.hw_accel);
973    }
974
975    #[test]
976    fn test_pipeline_builder_missing_input() {
977        let result = TranscodePipelineBuilder::new()
978            .output("/tmp/output.mkv")
979            .build();
980        assert!(result.is_err());
981    }
982
983    #[test]
984    fn test_pipeline_builder_missing_output() {
985        let result = TranscodePipelineBuilder::new()
986            .input("/tmp/input.mkv")
987            .build();
988        assert!(result.is_err());
989    }
990
991    #[test]
992    fn test_pipeline_stage_flow() {
993        let config = PipelineConfig {
994            input: PathBuf::from("/tmp/input.mkv"),
995            output: PathBuf::from("/tmp/output.mkv"),
996            video_codec: None,
997            audio_codec: None,
998            quality: None,
999            multipass: None,
1000            normalization: None,
1001            track_progress: false,
1002            hw_accel: true,
1003        };
1004
1005        let pipeline = Pipeline::new(config);
1006        assert!(matches!(
1007            pipeline.current_stage(),
1008            PipelineStage::Validation
1009        ));
1010    }
1011
1012    #[test]
1013    fn test_output_format_from_path() {
1014        assert!(matches!(
1015            output_format_from_path(std::path::Path::new("out.mkv")),
1016            ContainerFormat::Matroska
1017        ));
1018        assert!(matches!(
1019            output_format_from_path(std::path::Path::new("out.webm")),
1020            ContainerFormat::Matroska
1021        ));
1022        assert!(matches!(
1023            output_format_from_path(std::path::Path::new("out.ogg")),
1024            ContainerFormat::Ogg
1025        ));
1026    }
1027
1028    #[test]
1029    fn test_bytes_as_f32_samples_empty() {
1030        let samples = bytes_as_f32_samples(&[]);
1031        assert!(samples.is_empty());
1032    }
1033
1034    #[test]
1035    fn test_bytes_as_f32_samples_partial() {
1036        // 7 bytes → only 1 full f32 (4 bytes), trailing 3 discarded
1037        let data = [0u8; 7];
1038        let samples = bytes_as_f32_samples(&data);
1039        assert_eq!(samples.len(), 1);
1040    }
1041
1042    #[test]
1043    fn test_bytes_as_f32_known_value() {
1044        // 0.0f32 little-endian = [0x00, 0x00, 0x00, 0x00]
1045        let data = [0x00u8, 0x00, 0x00, 0x00];
1046        let samples = bytes_as_f32_samples(&data);
1047        assert_eq!(samples.len(), 1);
1048        assert_eq!(samples[0], 0.0f32);
1049    }
1050}