1use crate::{
25 MultiPassConfig, MultiPassEncoder, MultiPassMode, NormalizationConfig, ProgressTracker,
26 QualityConfig, Result, TranscodeError, TranscodeOutput,
27};
28use oximedia_container::{
29 demux::{Demuxer, FlacDemuxer, MatroskaDemuxer, OggDemuxer, WavDemuxer},
30 mux::{MatroskaMuxer, MuxerConfig, OggMuxer},
31 probe_format, ContainerFormat, Muxer, StreamInfo,
32};
33use oximedia_io::FileSource;
34use oximedia_metering::{LoudnessMeter, MeterConfig, Standard};
35use std::path::PathBuf;
36use std::time::Instant;
37use tracing::{debug, info, warn};
38
39const PROBE_BYTES: usize = 16 * 1024;
43
44const DEFAULT_SAMPLE_RATE: f64 = 48_000.0;
46
47const DEFAULT_CHANNELS: usize = 2;
49
50#[derive(Debug, Clone, Default)]
57struct PassStats {
58 bytes_in: u64,
60 bytes_out: u64,
62 video_frames: u64,
64 audio_frames: u64,
66}
67
68async fn detect_format(path: &std::path::Path) -> Result<ContainerFormat> {
72 let mut source = FileSource::open(path)
73 .await
74 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
75
76 use oximedia_io::MediaSource;
77 let mut buf = vec![0u8; PROBE_BYTES];
78 let n = source
79 .read(&mut buf)
80 .await
81 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
82 buf.truncate(n);
83
84 let probe = probe_format(&buf).map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
85 Ok(probe.format)
86}
87
88fn output_format_from_path(path: &std::path::Path) -> ContainerFormat {
90 match path
91 .extension()
92 .and_then(|e| e.to_str())
93 .map(str::to_lowercase)
94 .as_deref()
95 {
96 Some("mkv") | Some("webm") => ContainerFormat::Matroska,
97 Some("ogg") | Some("oga") | Some("opus") => ContainerFormat::Ogg,
98 Some("flac") => ContainerFormat::Flac,
99 Some("wav") => ContainerFormat::Wav,
100 _ => ContainerFormat::Matroska,
102 }
103}
104
105#[derive(Debug, Clone)]
109pub enum PipelineStage {
110 Validation,
112 AudioAnalysis,
114 FirstPass,
116 SecondPass,
118 ThirdPass,
120 Encode,
122 Verification,
124}
125
126#[derive(Debug, Clone)]
130pub struct PipelineConfig {
131 pub input: PathBuf,
133 pub output: PathBuf,
135 pub video_codec: Option<String>,
137 pub audio_codec: Option<String>,
139 pub quality: Option<QualityConfig>,
141 pub multipass: Option<MultiPassConfig>,
143 pub normalization: Option<NormalizationConfig>,
145 pub track_progress: bool,
147 pub hw_accel: bool,
149}
150
151pub struct Pipeline {
155 config: PipelineConfig,
156 current_stage: PipelineStage,
157 progress_tracker: Option<ProgressTracker>,
158 normalization_gain_db: f64,
160 encode_start: Option<Instant>,
162 accumulated_stats: PassStats,
164}
165
166impl Pipeline {
167 #[must_use]
169 pub fn new(config: PipelineConfig) -> Self {
170 Self {
171 config,
172 current_stage: PipelineStage::Validation,
173 progress_tracker: None,
174 normalization_gain_db: 0.0,
175 encode_start: None,
176 accumulated_stats: PassStats::default(),
177 }
178 }
179
180 pub fn set_progress_tracker(&mut self, tracker: ProgressTracker) {
182 self.progress_tracker = Some(tracker);
183 }
184
185 pub async fn execute(&mut self) -> Result<TranscodeOutput> {
191 self.current_stage = PipelineStage::Validation;
193 self.validate()?;
194
195 if self.config.normalization.is_some() {
197 self.current_stage = PipelineStage::AudioAnalysis;
198 self.analyze_audio().await?;
199 }
200
201 self.encode_start = Some(Instant::now());
203
204 if let Some(multipass_config) = &self.config.multipass {
206 let mut encoder = MultiPassEncoder::new(multipass_config.clone());
207
208 while encoder.has_more_passes() {
209 let pass = encoder.current_pass();
210 self.current_stage = match pass {
211 1 => PipelineStage::FirstPass,
212 2 => PipelineStage::SecondPass,
213 _ => PipelineStage::ThirdPass,
214 };
215
216 self.execute_pass(pass, &encoder).await?;
217 encoder.next_pass();
218 }
219
220 encoder.cleanup()?;
222 } else {
223 self.current_stage = PipelineStage::Encode;
225 let stats = self.execute_single_pass().await?;
226 self.accumulated_stats.bytes_in += stats.bytes_in;
227 self.accumulated_stats.bytes_out += stats.bytes_out;
228 self.accumulated_stats.video_frames += stats.video_frames;
229 self.accumulated_stats.audio_frames += stats.audio_frames;
230 }
231
232 self.current_stage = PipelineStage::Verification;
234 self.verify_output().await
235 }
236
237 #[must_use]
239 pub fn current_stage(&self) -> &PipelineStage {
240 &self.current_stage
241 }
242
243 fn validate(&self) -> Result<()> {
246 use crate::validation::{InputValidator, OutputValidator};
247
248 InputValidator::validate_path(
249 self.config
250 .input
251 .to_str()
252 .ok_or_else(|| TranscodeError::InvalidInput("Invalid input path".to_string()))?,
253 )?;
254
255 OutputValidator::validate_path(
256 self.config
257 .output
258 .to_str()
259 .ok_or_else(|| TranscodeError::InvalidOutput("Invalid output path".to_string()))?,
260 true,
261 )?;
262
263 Ok(())
264 }
265
266 async fn analyze_audio(&mut self) -> Result<()> {
276 let norm_config = match &self.config.normalization {
277 Some(c) => c.clone(),
278 None => return Ok(()),
279 };
280
281 info!(
282 "Analysing audio loudness for normalization (target: {} LUFS)",
283 norm_config.standard.target_lufs()
284 );
285
286 let format = detect_format(&self.config.input).await?;
287
288 let sample_rate = DEFAULT_SAMPLE_RATE;
290 let channels = DEFAULT_CHANNELS;
291
292 let meter_config = MeterConfig::minimal(Standard::EbuR128, sample_rate, channels);
293
294 let mut meter = LoudnessMeter::new(meter_config)
295 .map_err(|e| TranscodeError::NormalizationError(e.to_string()))?;
296
297 match format {
299 ContainerFormat::Matroska => {
300 let source = FileSource::open(&self.config.input)
301 .await
302 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
303 let mut demuxer = MatroskaDemuxer::new(source);
304 demuxer
305 .probe()
306 .await
307 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
308
309 feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
310 }
311 ContainerFormat::Ogg => {
312 let source = FileSource::open(&self.config.input)
313 .await
314 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
315 let mut demuxer = OggDemuxer::new(source);
316 demuxer
317 .probe()
318 .await
319 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
320
321 feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
322 }
323 ContainerFormat::Wav => {
324 let source = FileSource::open(&self.config.input)
325 .await
326 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
327 let mut demuxer = WavDemuxer::new(source);
328 demuxer
329 .probe()
330 .await
331 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
332
333 feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
334 }
335 ContainerFormat::Flac => {
336 let source = FileSource::open(&self.config.input)
337 .await
338 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
339 let mut demuxer = FlacDemuxer::new(source);
340 demuxer
341 .probe()
342 .await
343 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
344
345 feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
346 }
347 other => {
348 warn!(
349 "Audio analysis: unsupported format {:?} — skipping loudness scan",
350 other
351 );
352 return Ok(());
353 }
354 }
355
356 let metrics = meter.metrics();
357 let measured_lufs = metrics.integrated_lufs;
358 let measured_peak = metrics.true_peak_dbtp;
359 let target_lufs = norm_config.standard.target_lufs();
360 let max_peak = norm_config.standard.max_true_peak_dbtp();
361
362 let loudness_gain = target_lufs - measured_lufs;
364 let peak_headroom = max_peak - measured_peak;
365 self.normalization_gain_db = loudness_gain.min(peak_headroom);
366
367 info!(
368 "Audio analysis complete: measured {:.1} LUFS / {:.1} dBTP, \
369 required gain {:.2} dB",
370 measured_lufs, measured_peak, self.normalization_gain_db
371 );
372
373 Ok(())
374 }
375
376 async fn execute_pass(&mut self, pass: u32, _encoder: &MultiPassEncoder) -> Result<()> {
383 info!("Starting encode pass {}", pass);
384
385 if pass == 1 {
386 self.demux_and_count().await?;
388 } else {
389 let stats = self.execute_single_pass().await?;
391 self.accumulated_stats.bytes_in += stats.bytes_in;
392 self.accumulated_stats.bytes_out += stats.bytes_out;
393 self.accumulated_stats.video_frames += stats.video_frames;
394 self.accumulated_stats.audio_frames += stats.audio_frames;
395 }
396
397 Ok(())
398 }
399
400 async fn demux_and_count(&self) -> Result<u64> {
402 let format = detect_format(&self.config.input).await?;
403 let count = match format {
404 ContainerFormat::Matroska => {
405 let source = FileSource::open(&self.config.input)
406 .await
407 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
408 let mut demuxer = MatroskaDemuxer::new(source);
409 demuxer
410 .probe()
411 .await
412 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
413 count_packets(&mut demuxer).await
414 }
415 ContainerFormat::Ogg => {
416 let source = FileSource::open(&self.config.input)
417 .await
418 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
419 let mut demuxer = OggDemuxer::new(source);
420 demuxer
421 .probe()
422 .await
423 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
424 count_packets(&mut demuxer).await
425 }
426 ContainerFormat::Wav => {
427 let source = FileSource::open(&self.config.input)
428 .await
429 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
430 let mut demuxer = WavDemuxer::new(source);
431 demuxer
432 .probe()
433 .await
434 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
435 count_packets(&mut demuxer).await
436 }
437 ContainerFormat::Flac => {
438 let source = FileSource::open(&self.config.input)
439 .await
440 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
441 let mut demuxer = FlacDemuxer::new(source);
442 demuxer
443 .probe()
444 .await
445 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
446 count_packets(&mut demuxer).await
447 }
448 other => {
449 debug!("demux_and_count: unsupported format {:?}", other);
450 0
451 }
452 };
453
454 info!("Analysis pass: counted {} packets in input", count);
455 Ok(count)
456 }
457
458 async fn execute_single_pass(&self) -> Result<PassStats> {
469 let input_path = &self.config.input;
470 let output_path = &self.config.output;
471
472 info!(
473 "Single-pass transcode: {} → {}",
474 input_path.display(),
475 output_path.display()
476 );
477
478 let in_format = detect_format(input_path).await?;
479 let out_format = output_format_from_path(output_path);
480
481 debug!(
482 "Input format: {:?}, output format: {:?}",
483 in_format, out_format
484 );
485
486 if let Some(parent) = output_path.parent() {
488 if !parent.as_os_str().is_empty() && !parent.exists() {
489 #[cfg(not(target_arch = "wasm32"))]
490 {
491 tokio::fs::create_dir_all(parent)
492 .await
493 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
494 }
495 #[cfg(target_arch = "wasm32")]
496 {
497 return Err(TranscodeError::IoError(
498 "Filesystem operations not supported on wasm32".to_string(),
499 ));
500 }
501 }
502 }
503
504 let stats = match in_format {
506 ContainerFormat::Matroska => {
507 let source = FileSource::open(input_path)
508 .await
509 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
510 let mut demuxer = MatroskaDemuxer::new(source);
511 demuxer
512 .probe()
513 .await
514 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
515 self.remux(&mut demuxer, out_format, output_path).await?
516 }
517 ContainerFormat::Ogg => {
518 let source = FileSource::open(input_path)
519 .await
520 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
521 let mut demuxer = OggDemuxer::new(source);
522 demuxer
523 .probe()
524 .await
525 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
526 self.remux(&mut demuxer, out_format, output_path).await?
527 }
528 ContainerFormat::Wav => {
529 let source = FileSource::open(input_path)
530 .await
531 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
532 let mut demuxer = WavDemuxer::new(source);
533 demuxer
534 .probe()
535 .await
536 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
537 self.remux(&mut demuxer, out_format, output_path).await?
538 }
539 ContainerFormat::Flac => {
540 let source = FileSource::open(input_path)
541 .await
542 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
543 let mut demuxer = FlacDemuxer::new(source);
544 demuxer
545 .probe()
546 .await
547 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
548 self.remux(&mut demuxer, out_format, output_path).await?
549 }
550 other => {
551 return Err(TranscodeError::ContainerError(format!(
552 "Unsupported input container format: {:?}",
553 other
554 )));
555 }
556 };
557
558 Ok(stats)
559 }
560
561 async fn remux<D>(
574 &self,
575 demuxer: &mut D,
576 out_format: ContainerFormat,
577 output_path: &std::path::Path,
578 ) -> Result<PassStats>
579 where
580 D: Demuxer,
581 {
582 let streams: Vec<StreamInfo> = demuxer.streams().to_vec();
584
585 if streams.is_empty() {
586 return Err(TranscodeError::ContainerError(
587 "Input container has no streams".to_string(),
588 ));
589 }
590
591 let audio_stream_indices: Vec<usize> = streams
593 .iter()
594 .filter(|s| s.is_audio())
595 .map(|s| s.index)
596 .collect();
597
598 if let Some(ref vc) = self.config.video_codec {
600 debug!("Video codec override requested: {} (stream-copy path)", vc);
601 }
602 if let Some(ref ac) = self.config.audio_codec {
603 debug!("Audio codec override requested: {} (stream-copy path)", ac);
604 }
605 if self.normalization_gain_db.abs() > 0.01 {
606 info!(
607 "Normalization gain {:.2} dB will be applied to {} audio stream(s)",
608 self.normalization_gain_db,
609 audio_stream_indices.len()
610 );
611 }
612
613 let mux_config = MuxerConfig::new().with_writing_app("OxiMedia-Transcode");
614
615 let stats = match out_format {
616 ContainerFormat::Matroska => {
617 let sink = FileSource::create(output_path)
618 .await
619 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
620 let mut muxer = MatroskaMuxer::new(sink, mux_config);
621 for stream in &streams {
622 muxer
623 .add_stream(stream.clone())
624 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
625 }
626 muxer
627 .write_header()
628 .await
629 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
630
631 let stats = drain_packets_with_gain(
632 demuxer,
633 &mut muxer,
634 &self.progress_tracker,
635 &audio_stream_indices,
636 self.normalization_gain_db,
637 )
638 .await?;
639
640 muxer
641 .write_trailer()
642 .await
643 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
644
645 stats
646 }
647 ContainerFormat::Ogg => {
648 let sink = FileSource::create(output_path)
649 .await
650 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
651 let mut muxer = OggMuxer::new(sink, mux_config);
652 for stream in &streams {
653 muxer
654 .add_stream(stream.clone())
655 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
656 }
657 muxer
658 .write_header()
659 .await
660 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
661
662 let stats = drain_packets_with_gain(
663 demuxer,
664 &mut muxer,
665 &self.progress_tracker,
666 &audio_stream_indices,
667 self.normalization_gain_db,
668 )
669 .await?;
670
671 muxer
672 .write_trailer()
673 .await
674 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
675
676 stats
677 }
678 other => {
679 return Err(TranscodeError::ContainerError(format!(
680 "Unsupported output container format: {:?}",
681 other
682 )));
683 }
684 };
685
686 Ok(stats)
687 }
688
689 async fn verify_output(&self) -> Result<TranscodeOutput> {
697 let output_path = &self.config.output;
698
699 #[cfg(not(target_arch = "wasm32"))]
700 let metadata = tokio::fs::metadata(output_path).await.map_err(|e| {
701 TranscodeError::IoError(format!(
702 "Output file '{}' not found or unreadable: {}",
703 output_path.display(),
704 e
705 ))
706 })?;
707 #[cfg(target_arch = "wasm32")]
708 let metadata = std::fs::metadata(output_path).map_err(|e| {
709 TranscodeError::IoError(format!(
710 "Output file '{}' not found or unreadable: {}",
711 output_path.display(),
712 e
713 ))
714 })?;
715
716 let file_size = metadata.len();
717 if file_size == 0 {
718 return Err(TranscodeError::PipelineError(
719 "Output file is empty — transcode may have failed".to_string(),
720 ));
721 }
722
723 let encoding_time = match self.encode_start {
724 Some(t) => t.elapsed().as_secs_f64(),
725 None => 0.0,
726 };
727
728 let duration_approx = derive_duration_approx(file_size);
732
733 let speed_factor = if encoding_time > 0.0 && duration_approx > 0.0 {
734 duration_approx / encoding_time
735 } else {
736 1.0
737 };
738
739 let total_frames =
742 self.accumulated_stats.video_frames + self.accumulated_stats.audio_frames;
743 let video_bitrate_approx = if duration_approx > 0.0
744 && self.accumulated_stats.video_frames > 0
745 && total_frames > 0
746 {
747 let video_fraction = self.accumulated_stats.video_frames as f64 / total_frames as f64;
748 ((self.accumulated_stats.bytes_out as f64 * video_fraction * 8.0) / duration_approx)
749 as u64
750 } else {
751 0u64
752 };
753 let audio_bitrate_approx = if duration_approx > 0.0
754 && self.accumulated_stats.audio_frames > 0
755 && total_frames > 0
756 {
757 let audio_fraction = self.accumulated_stats.audio_frames as f64 / total_frames as f64;
758 ((self.accumulated_stats.bytes_out as f64 * audio_fraction * 8.0) / duration_approx)
759 as u64
760 } else {
761 0u64
762 };
763
764 info!(
765 "Transcode complete: {} video frames, {} audio frames, \
766 {} bytes in → {} bytes out, encoding time {:.2}s, speed {:.2}×",
767 self.accumulated_stats.video_frames,
768 self.accumulated_stats.audio_frames,
769 self.accumulated_stats.bytes_in,
770 self.accumulated_stats.bytes_out,
771 encoding_time,
772 speed_factor
773 );
774
775 Ok(TranscodeOutput {
776 output_path: output_path
777 .to_str()
778 .map(String::from)
779 .unwrap_or_else(|| output_path.display().to_string()),
780 file_size,
781 duration: duration_approx,
782 video_bitrate: video_bitrate_approx,
783 audio_bitrate: audio_bitrate_approx,
784 encoding_time,
785 speed_factor,
786 })
787 }
788}
789
790async fn drain_packets_with_gain<D, M>(
802 demuxer: &mut D,
803 muxer: &mut M,
804 _progress: &Option<ProgressTracker>,
805 audio_stream_indices: &[usize],
806 gain_db: f64,
807) -> Result<PassStats>
808where
809 D: Demuxer,
810 M: Muxer,
811{
812 let mut stats = PassStats::default();
813 let gain_linear = 10f64.powf(gain_db / 20.0) as f32;
815 let apply_gain = gain_db.abs() > 0.01 && !audio_stream_indices.is_empty();
816
817 loop {
818 match demuxer.read_packet().await {
819 Ok(mut pkt) => {
820 if pkt.should_discard() {
821 continue;
822 }
823
824 let raw_len = pkt.data.len() as u64;
825 stats.bytes_in += raw_len;
826
827 if audio_stream_indices.contains(&pkt.stream_index) {
828 if apply_gain {
830 pkt.data = apply_i16_gain(pkt.data, gain_linear);
831 }
832 stats.audio_frames += 1;
833 } else {
834 stats.video_frames += 1;
835 }
836
837 let out_len = pkt.data.len() as u64;
838 stats.bytes_out += out_len;
839
840 muxer
841 .write_packet(&pkt)
842 .await
843 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
844
845 let total = stats.video_frames + stats.audio_frames;
846 if total % 500 == 0 {
847 debug!(
848 "Remuxed {} packets ({} video, {} audio)",
849 total, stats.video_frames, stats.audio_frames
850 );
851 }
852 }
853 Err(e) if e.is_eof() => break,
854 Err(e) => {
855 return Err(TranscodeError::ContainerError(format!(
856 "Error reading packet: {}",
857 e
858 )));
859 }
860 }
861 }
862
863 debug!(
864 "drain_packets_with_gain: {} video frames, {} audio frames, \
865 {} bytes in, {} bytes out",
866 stats.video_frames, stats.audio_frames, stats.bytes_in, stats.bytes_out
867 );
868 Ok(stats)
869}
870
871fn apply_i16_gain(data: bytes::Bytes, gain_linear: f32) -> bytes::Bytes {
877 if (gain_linear - 1.0).abs() < f32::EPSILON {
878 return data;
879 }
880 let mut buf: Vec<u8> = data.into();
881 let n_samples = buf.len() / 2;
882 for i in 0..n_samples {
883 let lo = buf[i * 2];
884 let hi = buf[i * 2 + 1];
885 let sample = i16::from_le_bytes([lo, hi]) as f32;
886 let gained = (sample * gain_linear).clamp(i16::MIN as f32, i16::MAX as f32) as i16;
887 let out = gained.to_le_bytes();
888 buf[i * 2] = out[0];
889 buf[i * 2 + 1] = out[1];
890 }
891 bytes::Bytes::from(buf)
892}
893
894async fn count_packets<D: Demuxer>(demuxer: &mut D) -> u64 {
896 let mut count: u64 = 0;
897 loop {
898 match demuxer.read_packet().await {
899 Ok(_) => count += 1,
900 Err(e) if e.is_eof() => break,
901 Err(_) => break,
902 }
903 }
904 count
905}
906
907async fn feed_audio_packets_to_meter<D: Demuxer>(demuxer: &mut D, meter: &mut LoudnessMeter) {
913 let audio_stream_indices: Vec<usize> = demuxer
914 .streams()
915 .iter()
916 .filter(|s| s.is_audio())
917 .map(|s| s.index)
918 .collect();
919
920 loop {
921 match demuxer.read_packet().await {
922 Ok(pkt) => {
923 if !audio_stream_indices.contains(&pkt.stream_index) {
924 continue;
925 }
926 let samples = bytes_as_f32_samples(&pkt.data);
928 if !samples.is_empty() {
929 meter.process_f32(&samples);
930 }
931 }
932 Err(e) if e.is_eof() => break,
933 Err(_) => break,
934 }
935 }
936}
937
938fn bytes_as_f32_samples(data: &[u8]) -> Vec<f32> {
941 let n_samples = data.len() / 4;
942 let mut out = Vec::with_capacity(n_samples);
943 for i in 0..n_samples {
944 let base = i * 4;
945 let raw = u32::from_le_bytes([data[base], data[base + 1], data[base + 2], data[base + 3]]);
946 out.push(f32::from_bits(raw));
947 }
948 out
949}
950
951fn derive_duration_approx(file_size: u64) -> f64 {
957 const BYTES_PER_SECOND: f64 = 625_000.0;
959 file_size as f64 / BYTES_PER_SECOND
960}
961
962pub struct TranscodePipeline {
966 config: PipelineConfig,
967}
968
969impl TranscodePipeline {
970 #[must_use]
972 pub fn builder() -> TranscodePipelineBuilder {
973 TranscodePipelineBuilder::new()
974 }
975
976 pub fn set_video_codec(&mut self, codec: &str) {
978 self.config.video_codec = Some(codec.to_string());
979 }
980
981 pub fn set_audio_codec(&mut self, codec: &str) {
983 self.config.audio_codec = Some(codec.to_string());
984 }
985
986 pub async fn execute(&mut self) -> crate::Result<TranscodeOutput> {
992 let mut pipeline = Pipeline::new(self.config.clone());
993 pipeline.execute().await
994 }
995}
996
997pub struct TranscodePipelineBuilder {
1001 input: Option<PathBuf>,
1002 output: Option<PathBuf>,
1003 video_codec: Option<String>,
1004 audio_codec: Option<String>,
1005 quality: Option<QualityConfig>,
1006 multipass: Option<MultiPassMode>,
1007 normalization: Option<NormalizationConfig>,
1008 track_progress: bool,
1009 hw_accel: bool,
1010}
1011
1012impl TranscodePipelineBuilder {
1013 #[must_use]
1015 pub fn new() -> Self {
1016 Self {
1017 input: None,
1018 output: None,
1019 video_codec: None,
1020 audio_codec: None,
1021 quality: None,
1022 multipass: None,
1023 normalization: None,
1024 track_progress: false,
1025 hw_accel: true,
1026 }
1027 }
1028
1029 #[must_use]
1031 pub fn input(mut self, path: impl Into<PathBuf>) -> Self {
1032 self.input = Some(path.into());
1033 self
1034 }
1035
1036 #[must_use]
1038 pub fn output(mut self, path: impl Into<PathBuf>) -> Self {
1039 self.output = Some(path.into());
1040 self
1041 }
1042
1043 #[must_use]
1045 pub fn video_codec(mut self, codec: impl Into<String>) -> Self {
1046 self.video_codec = Some(codec.into());
1047 self
1048 }
1049
1050 #[must_use]
1052 pub fn audio_codec(mut self, codec: impl Into<String>) -> Self {
1053 self.audio_codec = Some(codec.into());
1054 self
1055 }
1056
1057 #[must_use]
1059 pub fn quality(mut self, quality: QualityConfig) -> Self {
1060 self.quality = Some(quality);
1061 self
1062 }
1063
1064 #[must_use]
1066 pub fn multipass(mut self, mode: MultiPassMode) -> Self {
1067 self.multipass = Some(mode);
1068 self
1069 }
1070
1071 #[must_use]
1073 pub fn normalization(mut self, config: NormalizationConfig) -> Self {
1074 self.normalization = Some(config);
1075 self
1076 }
1077
1078 #[must_use]
1080 pub fn track_progress(mut self, enable: bool) -> Self {
1081 self.track_progress = enable;
1082 self
1083 }
1084
1085 #[must_use]
1087 pub fn hw_accel(mut self, enable: bool) -> Self {
1088 self.hw_accel = enable;
1089 self
1090 }
1091
1092 pub fn build(self) -> crate::Result<TranscodePipeline> {
1098 let input = self
1099 .input
1100 .ok_or_else(|| TranscodeError::InvalidInput("Input path not specified".to_string()))?;
1101
1102 let output = self.output.ok_or_else(|| {
1103 TranscodeError::InvalidOutput("Output path not specified".to_string())
1104 })?;
1105
1106 let multipass_config = self
1107 .multipass
1108 .map(|mode| MultiPassConfig::new(mode, "/tmp/transcode_stats.log"));
1109
1110 Ok(TranscodePipeline {
1111 config: PipelineConfig {
1112 input,
1113 output,
1114 video_codec: self.video_codec,
1115 audio_codec: self.audio_codec,
1116 quality: self.quality,
1117 multipass: multipass_config,
1118 normalization: self.normalization,
1119 track_progress: self.track_progress,
1120 hw_accel: self.hw_accel,
1121 },
1122 })
1123 }
1124}
1125
1126impl Default for TranscodePipelineBuilder {
1127 fn default() -> Self {
1128 Self::new()
1129 }
1130}
1131
1132#[cfg(test)]
1135mod tests {
1136 use super::*;
1137
1138 #[test]
1139 fn test_pipeline_builder() {
1140 let result = TranscodePipelineBuilder::new()
1141 .input("/tmp/input.mkv")
1142 .output("/tmp/output.mkv")
1143 .video_codec("vp9")
1144 .audio_codec("opus")
1145 .track_progress(true)
1146 .hw_accel(false)
1147 .build();
1148
1149 assert!(result.is_ok());
1150 let pipeline = result.expect("should succeed in test");
1151 assert_eq!(pipeline.config.input, PathBuf::from("/tmp/input.mkv"));
1152 assert_eq!(pipeline.config.output, PathBuf::from("/tmp/output.mkv"));
1153 assert_eq!(pipeline.config.video_codec, Some("vp9".to_string()));
1154 assert_eq!(pipeline.config.audio_codec, Some("opus".to_string()));
1155 assert!(pipeline.config.track_progress);
1156 assert!(!pipeline.config.hw_accel);
1157 }
1158
1159 #[test]
1160 fn test_pipeline_builder_missing_input() {
1161 let result = TranscodePipelineBuilder::new()
1162 .output("/tmp/output.mkv")
1163 .build();
1164 assert!(result.is_err());
1165 }
1166
1167 #[test]
1168 fn test_pipeline_builder_missing_output() {
1169 let result = TranscodePipelineBuilder::new()
1170 .input("/tmp/input.mkv")
1171 .build();
1172 assert!(result.is_err());
1173 }
1174
1175 #[test]
1176 fn test_pipeline_stage_flow() {
1177 let config = PipelineConfig {
1178 input: PathBuf::from("/tmp/input.mkv"),
1179 output: PathBuf::from("/tmp/output.mkv"),
1180 video_codec: None,
1181 audio_codec: None,
1182 quality: None,
1183 multipass: None,
1184 normalization: None,
1185 track_progress: false,
1186 hw_accel: true,
1187 };
1188
1189 let pipeline = Pipeline::new(config);
1190 assert!(matches!(
1191 pipeline.current_stage(),
1192 PipelineStage::Validation
1193 ));
1194 }
1195
1196 #[test]
1197 fn test_output_format_from_path() {
1198 assert!(matches!(
1199 output_format_from_path(std::path::Path::new("out.mkv")),
1200 ContainerFormat::Matroska
1201 ));
1202 assert!(matches!(
1203 output_format_from_path(std::path::Path::new("out.webm")),
1204 ContainerFormat::Matroska
1205 ));
1206 assert!(matches!(
1207 output_format_from_path(std::path::Path::new("out.ogg")),
1208 ContainerFormat::Ogg
1209 ));
1210 }
1211
1212 #[test]
1213 fn test_bytes_as_f32_samples_empty() {
1214 let samples = bytes_as_f32_samples(&[]);
1215 assert!(samples.is_empty());
1216 }
1217
1218 #[test]
1219 fn test_bytes_as_f32_samples_partial() {
1220 let data = [0u8; 7];
1222 let samples = bytes_as_f32_samples(&data);
1223 assert_eq!(samples.len(), 1);
1224 }
1225
1226 #[test]
1227 fn test_bytes_as_f32_known_value() {
1228 let data = [0x00u8, 0x00, 0x00, 0x00];
1230 let samples = bytes_as_f32_samples(&data);
1231 assert_eq!(samples.len(), 1);
1232 assert_eq!(samples[0], 0.0f32);
1233 }
1234
1235 #[test]
1238 fn test_apply_i16_gain_unity() {
1239 let sample: i16 = 1234;
1241 let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1242 let out = apply_i16_gain(raw.clone(), 1.0);
1243 assert_eq!(&out[..], &raw[..]);
1244 }
1245
1246 #[test]
1247 fn test_apply_i16_gain_double() {
1248 let sample: i16 = 1000;
1250 let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1251 let out = apply_i16_gain(raw, 2.0);
1252 let result = i16::from_le_bytes([out[0], out[1]]);
1253 assert_eq!(result, 2000);
1254 }
1255
1256 #[test]
1257 fn test_apply_i16_gain_clamp_positive() {
1258 let sample: i16 = i16::MAX;
1260 let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1261 let out = apply_i16_gain(raw, 2.0);
1262 let result = i16::from_le_bytes([out[0], out[1]]);
1263 assert_eq!(result, i16::MAX);
1264 }
1265
1266 #[test]
1267 fn test_apply_i16_gain_clamp_negative() {
1268 let sample: i16 = i16::MIN;
1270 let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1271 let out = apply_i16_gain(raw, 2.0);
1272 let result = i16::from_le_bytes([out[0], out[1]]);
1273 assert_eq!(result, i16::MIN);
1274 }
1275
1276 #[test]
1277 fn test_apply_i16_gain_half() {
1278 let sample: i16 = 2000;
1280 let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1281 let out = apply_i16_gain(raw, 0.5);
1282 let result = i16::from_le_bytes([out[0], out[1]]);
1283 assert_eq!(result, 1000);
1284 }
1285
1286 #[test]
1287 fn test_apply_i16_gain_odd_byte_length() {
1288 let raw = bytes::Bytes::from(vec![0xFFu8, 0x7F, 0xAB]); let out = apply_i16_gain(raw, 2.0);
1291 let result = i16::from_le_bytes([out[0], out[1]]);
1293 assert_eq!(result, i16::MAX);
1294 assert_eq!(out[2], 0xAB);
1296 }
1297
1298 #[test]
1301 fn test_pass_stats_default() {
1302 let stats = PassStats::default();
1303 assert_eq!(stats.bytes_in, 0);
1304 assert_eq!(stats.bytes_out, 0);
1305 assert_eq!(stats.video_frames, 0);
1306 assert_eq!(stats.audio_frames, 0);
1307 }
1308
1309 #[tokio::test]
1317 async fn test_pipeline_execute_remux_produces_output() {
1318 use oximedia_container::{
1319 mux::{MatroskaMuxer, MuxerConfig},
1320 Muxer, Packet, PacketFlags, StreamInfo,
1321 };
1322 use oximedia_core::{CodecId, Rational, Timestamp};
1323 use oximedia_io::MemorySource;
1324
1325 let in_buf = MemorySource::new_writable(64 * 1024);
1327 let mut muxer = MatroskaMuxer::new(in_buf, MuxerConfig::new());
1328
1329 let mut video = StreamInfo::new(0, CodecId::Vp9, Rational::new(1, 1000));
1330 video.codec_params.width = Some(320);
1331 video.codec_params.height = Some(240);
1332 muxer.add_stream(video).expect("add stream");
1333 muxer.write_header().await.expect("write header");
1334
1335 for i in 0u64..30 {
1337 let data = vec![0x42u8, 0x00, (i & 0xFF) as u8, 0x01];
1338 let pkt = Packet::new(
1339 0,
1340 bytes::Bytes::from(data),
1341 Timestamp::new(i as i64 * 33, Rational::new(1, 1000)),
1342 PacketFlags::KEYFRAME,
1343 );
1344 muxer.write_packet(&pkt).await.expect("write packet");
1345 }
1346 muxer.write_trailer().await.expect("write trailer");
1347
1348 let tmp_dir = std::env::temp_dir();
1350 let input_path = tmp_dir.join("pipeline_test_input.mkv");
1351 let output_path = tmp_dir.join("pipeline_test_output.mkv");
1352
1353 let sink = muxer.into_sink();
1354 let mkv_bytes = sink.written_data().to_vec();
1355 tokio::fs::write(&input_path, &mkv_bytes)
1356 .await
1357 .expect("write temp input");
1358
1359 let mut pipeline = TranscodePipelineBuilder::new()
1361 .input(input_path.clone())
1362 .output(output_path.clone())
1363 .build()
1364 .expect("build pipeline");
1365
1366 let result = pipeline.execute().await;
1367
1368 let _ = tokio::fs::remove_file(&input_path).await;
1370 let _ = tokio::fs::remove_file(&output_path).await;
1371
1372 let output = result.expect("pipeline execute should succeed");
1373
1374 assert!(
1376 output.file_size > 0,
1377 "output file size must be > 0, got {}",
1378 output.file_size
1379 );
1380 assert!(
1381 output.encoding_time >= 0.0,
1382 "encoding time must be non-negative"
1383 );
1384 }
1385
1386 #[tokio::test]
1389 async fn test_pipeline_execute_with_normalization_gain() {
1390 use crate::{LoudnessStandard, NormalizationConfig};
1391 use oximedia_container::{
1392 mux::{MatroskaMuxer, MuxerConfig},
1393 Muxer, Packet, PacketFlags, StreamInfo,
1394 };
1395 use oximedia_core::{CodecId, Rational, Timestamp};
1396 use oximedia_io::MemorySource;
1397
1398 let in_buf = MemorySource::new_writable(64 * 1024);
1400 let mut muxer = MatroskaMuxer::new(in_buf, MuxerConfig::new());
1401
1402 let mut audio = StreamInfo::new(0, CodecId::Opus, Rational::new(1, 48000));
1403 audio.codec_params.sample_rate = Some(48000);
1404 audio.codec_params.channels = Some(2);
1405 muxer.add_stream(audio).expect("add audio stream");
1406 muxer.write_header().await.expect("write header");
1407
1408 for i in 0u64..20 {
1410 let sample_le: i16 = 100;
1412 let mut data = Vec::with_capacity(32);
1413 for _ in 0..16 {
1414 data.extend_from_slice(&sample_le.to_le_bytes());
1415 }
1416 let pkt = Packet::new(
1417 0,
1418 bytes::Bytes::from(data),
1419 Timestamp::new(i as i64 * 960, Rational::new(1, 48000)),
1420 PacketFlags::KEYFRAME,
1421 );
1422 muxer.write_packet(&pkt).await.expect("write audio packet");
1423 }
1424 muxer.write_trailer().await.expect("write trailer");
1425
1426 let tmp_dir = std::env::temp_dir();
1427 let input_path = tmp_dir.join("pipeline_norm_input.mkv");
1428 let output_path = tmp_dir.join("pipeline_norm_output.mkv");
1429
1430 let sink = muxer.into_sink();
1431 let mkv_bytes = sink.written_data().to_vec();
1432 tokio::fs::write(&input_path, &mkv_bytes)
1433 .await
1434 .expect("write temp input");
1435
1436 let norm_config = NormalizationConfig::new(LoudnessStandard::EbuR128);
1438 let pipeline_config = PipelineConfig {
1440 input: input_path.clone(),
1441 output: output_path.clone(),
1442 video_codec: None,
1443 audio_codec: None,
1444 quality: None,
1445 multipass: None,
1446 normalization: Some(norm_config),
1447 track_progress: false,
1448 hw_accel: false,
1449 };
1450 let mut pipeline_inner = Pipeline::new(pipeline_config);
1451 pipeline_inner.normalization_gain_db = 6.0206;
1453 pipeline_inner.encode_start = Some(std::time::Instant::now());
1454 pipeline_inner.current_stage = PipelineStage::Encode;
1455
1456 let pass_stats = pipeline_inner.execute_single_pass().await;
1457
1458 let _ = tokio::fs::remove_file(&input_path).await;
1459 let _ = tokio::fs::remove_file(&output_path).await;
1460
1461 let stats = pass_stats.expect("single-pass should succeed");
1462 assert!(
1463 stats.audio_frames > 0,
1464 "must have processed at least one audio frame"
1465 );
1466 assert!(
1467 stats.bytes_in > 0,
1468 "must have read at least some bytes from input"
1469 );
1470 assert!(
1471 stats.bytes_out > 0,
1472 "must have written at least some bytes to output"
1473 );
1474 }
1475}