1use crate::{
65 make_video_encoder, MultiPassConfig, MultiPassEncoder, MultiPassMode, NormalizationConfig,
66 ProgressTracker, QualityConfig, RateControlMode, Result, TranscodeError, TranscodeOutput,
67 VideoEncoderParams,
68};
69use oximedia_container::{
70 demux::{Demuxer, FlacDemuxer, MatroskaDemuxer, OggDemuxer, WavDemuxer},
71 mux::{MatroskaMuxer, MuxerConfig, OggMuxer},
72 probe_format, ContainerFormat, Muxer, StreamInfo,
73};
74use oximedia_core::CodecId;
75use oximedia_io::FileSource;
76use oximedia_metering::{LoudnessMeter, MeterConfig, Standard};
77use std::path::PathBuf;
78use std::time::Instant;
79use tracing::{debug, info, warn};
80
81const PROBE_BYTES: usize = 16 * 1024;
85
86const DEFAULT_SAMPLE_RATE: f64 = 48_000.0;
88
89const DEFAULT_CHANNELS: usize = 2;
91
92const INTRA_DEFAULT_QUALITY: u8 = 85;
96
97const INTRA_FALLBACK_WIDTH: u32 = 1920;
99
100const INTRA_FALLBACK_HEIGHT: u32 = 1080;
102
103fn parse_intra_codec(name: &str) -> Option<CodecId> {
108 match name.to_lowercase().as_str() {
109 "mjpeg" | "motion-jpeg" | "motion_jpeg" => Some(CodecId::Mjpeg),
110 "apv" => Some(CodecId::Apv),
111 _ => None,
112 }
113}
114
115fn intra_encoder_params(streams: &[StreamInfo], quality: u8) -> Result<VideoEncoderParams> {
121 let (w, h) = streams
122 .iter()
123 .find(|s| s.is_video())
124 .and_then(|s| {
125 let w = s.codec_params.width?;
126 let h = s.codec_params.height?;
127 Some((w, h))
128 })
129 .unwrap_or((INTRA_FALLBACK_WIDTH, INTRA_FALLBACK_HEIGHT));
130
131 VideoEncoderParams::new(w, h, quality)
132}
133
134#[derive(Debug, Clone, Default)]
141struct PassStats {
142 bytes_in: u64,
144 bytes_out: u64,
146 video_frames: u64,
148 audio_frames: u64,
150}
151
152async fn detect_format(path: &std::path::Path) -> Result<ContainerFormat> {
156 let mut source = FileSource::open(path)
157 .await
158 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
159
160 use oximedia_io::MediaSource;
161 let mut buf = vec![0u8; PROBE_BYTES];
162 let n = source
163 .read(&mut buf)
164 .await
165 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
166 buf.truncate(n);
167
168 let probe = probe_format(&buf).map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
169 Ok(probe.format)
170}
171
172fn output_format_from_path(path: &std::path::Path) -> ContainerFormat {
174 match path
175 .extension()
176 .and_then(|e| e.to_str())
177 .map(str::to_lowercase)
178 .as_deref()
179 {
180 Some("mkv") | Some("webm") => ContainerFormat::Matroska,
181 Some("ogg") | Some("oga") | Some("opus") => ContainerFormat::Ogg,
182 Some("flac") => ContainerFormat::Flac,
183 Some("wav") => ContainerFormat::Wav,
184 _ => ContainerFormat::Matroska,
186 }
187}
188
189#[derive(Debug, Clone)]
193pub enum PipelineStage {
194 Validation,
196 AudioAnalysis,
198 FirstPass,
200 SecondPass,
202 ThirdPass,
204 Encode,
206 Verification,
208}
209
210#[derive(Debug, Clone)]
214pub struct PipelineConfig {
215 pub input: PathBuf,
217 pub output: PathBuf,
219 pub video_codec: Option<String>,
221 pub audio_codec: Option<String>,
223 pub quality: Option<QualityConfig>,
225 pub multipass: Option<MultiPassConfig>,
227 pub normalization: Option<NormalizationConfig>,
229 pub track_progress: bool,
231 pub hw_accel: bool,
233}
234
235pub struct Pipeline {
239 config: PipelineConfig,
240 current_stage: PipelineStage,
241 progress_tracker: Option<ProgressTracker>,
242 normalization_gain_db: f64,
244 encode_start: Option<Instant>,
246 accumulated_stats: PassStats,
248}
249
250impl Pipeline {
251 #[must_use]
253 pub fn new(config: PipelineConfig) -> Self {
254 Self {
255 config,
256 current_stage: PipelineStage::Validation,
257 progress_tracker: None,
258 normalization_gain_db: 0.0,
259 encode_start: None,
260 accumulated_stats: PassStats::default(),
261 }
262 }
263
264 pub fn set_progress_tracker(&mut self, tracker: ProgressTracker) {
266 self.progress_tracker = Some(tracker);
267 }
268
269 pub async fn execute(&mut self) -> Result<TranscodeOutput> {
275 self.current_stage = PipelineStage::Validation;
277 self.validate()?;
278
279 if self.config.normalization.is_some() {
281 self.current_stage = PipelineStage::AudioAnalysis;
282 self.analyze_audio().await?;
283 }
284
285 self.encode_start = Some(Instant::now());
287
288 if let Some(multipass_config) = &self.config.multipass {
290 let mut encoder = MultiPassEncoder::new(multipass_config.clone());
291
292 while encoder.has_more_passes() {
293 let pass = encoder.current_pass();
294 self.current_stage = match pass {
295 1 => PipelineStage::FirstPass,
296 2 => PipelineStage::SecondPass,
297 _ => PipelineStage::ThirdPass,
298 };
299
300 self.execute_pass(pass, &encoder).await?;
301 encoder.next_pass();
302 }
303
304 encoder.cleanup()?;
306 } else {
307 self.current_stage = PipelineStage::Encode;
309 let stats = self.execute_single_pass().await?;
310 self.accumulated_stats.bytes_in += stats.bytes_in;
311 self.accumulated_stats.bytes_out += stats.bytes_out;
312 self.accumulated_stats.video_frames += stats.video_frames;
313 self.accumulated_stats.audio_frames += stats.audio_frames;
314 }
315
316 self.current_stage = PipelineStage::Verification;
318 self.verify_output().await
319 }
320
321 #[must_use]
323 pub fn current_stage(&self) -> &PipelineStage {
324 &self.current_stage
325 }
326
327 fn requires_frame_level(&self) -> bool {
343 let video_needs_frame_level = self.config.video_codec.as_deref().map_or(false, |vc| {
344 let lc = vc.to_lowercase();
345 lc != "copy"
346 && lc != "stream-copy"
347 && lc != "stream_copy"
348 && parse_intra_codec(vc).is_none()
349 });
350
351 let audio_needs_frame_level = self.config.audio_codec.as_deref().map_or(false, |ac| {
352 let lc = ac.to_lowercase();
353 lc != "copy" && lc != "stream-copy" && lc != "stream_copy"
354 });
355
356 video_needs_frame_level || audio_needs_frame_level
357 }
358
359 fn validate(&self) -> Result<()> {
362 use crate::validation::{InputValidator, OutputValidator};
363
364 InputValidator::validate_path(
365 self.config
366 .input
367 .to_str()
368 .ok_or_else(|| TranscodeError::InvalidInput("Invalid input path".to_string()))?,
369 )?;
370
371 OutputValidator::validate_path(
372 self.config
373 .output
374 .to_str()
375 .ok_or_else(|| TranscodeError::InvalidOutput("Invalid output path".to_string()))?,
376 true,
377 )?;
378
379 Ok(())
380 }
381
382 async fn analyze_audio(&mut self) -> Result<()> {
392 let norm_config = match &self.config.normalization {
393 Some(c) => c.clone(),
394 None => return Ok(()),
395 };
396
397 info!(
398 "Analysing audio loudness for normalization (target: {} LUFS)",
399 norm_config.standard.target_lufs()
400 );
401
402 let format = detect_format(&self.config.input).await?;
403
404 let sample_rate = DEFAULT_SAMPLE_RATE;
406 let channels = DEFAULT_CHANNELS;
407
408 let meter_config = MeterConfig::minimal(Standard::EbuR128, sample_rate, channels);
409
410 let mut meter = LoudnessMeter::new(meter_config)
411 .map_err(|e| TranscodeError::NormalizationError(e.to_string()))?;
412
413 match format {
415 ContainerFormat::Matroska => {
416 let source = FileSource::open(&self.config.input)
417 .await
418 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
419 let mut demuxer = MatroskaDemuxer::new(source);
420 demuxer
421 .probe()
422 .await
423 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
424
425 feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
426 }
427 ContainerFormat::Ogg => {
428 let source = FileSource::open(&self.config.input)
429 .await
430 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
431 let mut demuxer = OggDemuxer::new(source);
432 demuxer
433 .probe()
434 .await
435 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
436
437 feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
438 }
439 ContainerFormat::Wav => {
440 let source = FileSource::open(&self.config.input)
441 .await
442 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
443 let mut demuxer = WavDemuxer::new(source);
444 demuxer
445 .probe()
446 .await
447 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
448
449 feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
450 }
451 ContainerFormat::Flac => {
452 let source = FileSource::open(&self.config.input)
453 .await
454 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
455 let mut demuxer = FlacDemuxer::new(source);
456 demuxer
457 .probe()
458 .await
459 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
460
461 feed_audio_packets_to_meter(&mut demuxer, &mut meter).await;
462 }
463 other => {
464 warn!(
465 "Audio analysis: unsupported format {:?} — skipping loudness scan",
466 other
467 );
468 return Ok(());
469 }
470 }
471
472 let metrics = meter.metrics();
473 let measured_lufs = metrics.integrated_lufs;
474 let measured_peak = metrics.true_peak_dbtp;
475 let target_lufs = norm_config.standard.target_lufs();
476 let max_peak = norm_config.standard.max_true_peak_dbtp();
477
478 let loudness_gain = target_lufs - measured_lufs;
480 let peak_headroom = max_peak - measured_peak;
481 self.normalization_gain_db = loudness_gain.min(peak_headroom);
482
483 info!(
484 "Audio analysis complete: measured {:.1} LUFS / {:.1} dBTP, \
485 required gain {:.2} dB",
486 measured_lufs, measured_peak, self.normalization_gain_db
487 );
488
489 Ok(())
490 }
491
492 async fn execute_pass(&mut self, pass: u32, _encoder: &MultiPassEncoder) -> Result<()> {
499 info!("Starting encode pass {}", pass);
500
501 if pass == 1 {
502 self.demux_and_count().await?;
504 } else {
505 let stats = self.execute_single_pass().await?;
507 self.accumulated_stats.bytes_in += stats.bytes_in;
508 self.accumulated_stats.bytes_out += stats.bytes_out;
509 self.accumulated_stats.video_frames += stats.video_frames;
510 self.accumulated_stats.audio_frames += stats.audio_frames;
511 }
512
513 Ok(())
514 }
515
516 async fn demux_and_count(&self) -> Result<u64> {
518 let format = detect_format(&self.config.input).await?;
519 let count = match format {
520 ContainerFormat::Matroska => {
521 let source = FileSource::open(&self.config.input)
522 .await
523 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
524 let mut demuxer = MatroskaDemuxer::new(source);
525 demuxer
526 .probe()
527 .await
528 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
529 count_packets(&mut demuxer).await
530 }
531 ContainerFormat::Ogg => {
532 let source = FileSource::open(&self.config.input)
533 .await
534 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
535 let mut demuxer = OggDemuxer::new(source);
536 demuxer
537 .probe()
538 .await
539 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
540 count_packets(&mut demuxer).await
541 }
542 ContainerFormat::Wav => {
543 let source = FileSource::open(&self.config.input)
544 .await
545 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
546 let mut demuxer = WavDemuxer::new(source);
547 demuxer
548 .probe()
549 .await
550 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
551 count_packets(&mut demuxer).await
552 }
553 ContainerFormat::Flac => {
554 let source = FileSource::open(&self.config.input)
555 .await
556 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
557 let mut demuxer = FlacDemuxer::new(source);
558 demuxer
559 .probe()
560 .await
561 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
562 count_packets(&mut demuxer).await
563 }
564 other => {
565 debug!("demux_and_count: unsupported format {:?}", other);
566 0
567 }
568 };
569
570 info!("Analysis pass: counted {} packets in input", count);
571 Ok(count)
572 }
573
574 async fn execute_single_pass(&self) -> Result<PassStats> {
585 let input_path = &self.config.input;
586 let output_path = &self.config.output;
587
588 info!(
589 "Single-pass transcode: {} → {}",
590 input_path.display(),
591 output_path.display()
592 );
593
594 let in_format = detect_format(input_path).await?;
595 let out_format = output_format_from_path(output_path);
596
597 debug!(
598 "Input format: {:?}, output format: {:?}",
599 in_format, out_format
600 );
601
602 if self.requires_frame_level() {
617 let vc = self.config.video_codec.as_deref().unwrap_or("(none)");
618 let ac = self.config.audio_codec.as_deref().unwrap_or("(none)");
619 return Err(TranscodeError::Unsupported(format!(
620 "Codec transcoding (video={vc}, audio={ac}) requires frame-level decode→encode. \
621 Use MultiTrackExecutor with per-track FrameDecoder/FrameEncoder instances \
622 instead of Pipeline::execute()."
623 )));
624 }
625
626 if let Some(parent) = output_path.parent() {
628 if !parent.as_os_str().is_empty() && !parent.exists() {
629 #[cfg(not(target_arch = "wasm32"))]
630 {
631 tokio::fs::create_dir_all(parent)
632 .await
633 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
634 }
635 #[cfg(target_arch = "wasm32")]
636 {
637 return Err(TranscodeError::IoError(
638 "Filesystem operations not supported on wasm32".to_string(),
639 ));
640 }
641 }
642 }
643
644 let stats = match in_format {
646 ContainerFormat::Matroska => {
647 let source = FileSource::open(input_path)
648 .await
649 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
650 let mut demuxer = MatroskaDemuxer::new(source);
651 demuxer
652 .probe()
653 .await
654 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
655 self.remux(&mut demuxer, out_format, output_path).await?
656 }
657 ContainerFormat::Ogg => {
658 let source = FileSource::open(input_path)
659 .await
660 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
661 let mut demuxer = OggDemuxer::new(source);
662 demuxer
663 .probe()
664 .await
665 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
666 self.remux(&mut demuxer, out_format, output_path).await?
667 }
668 ContainerFormat::Wav => {
669 let source = FileSource::open(input_path)
670 .await
671 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
672 let mut demuxer = WavDemuxer::new(source);
673 demuxer
674 .probe()
675 .await
676 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
677 self.remux(&mut demuxer, out_format, output_path).await?
678 }
679 ContainerFormat::Flac => {
680 let source = FileSource::open(input_path)
681 .await
682 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
683 let mut demuxer = FlacDemuxer::new(source);
684 demuxer
685 .probe()
686 .await
687 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
688 self.remux(&mut demuxer, out_format, output_path).await?
689 }
690 other => {
691 return Err(TranscodeError::ContainerError(format!(
692 "Unsupported input container format: {:?}",
693 other
694 )));
695 }
696 };
697
698 Ok(stats)
699 }
700
701 async fn remux<D>(
714 &self,
715 demuxer: &mut D,
716 out_format: ContainerFormat,
717 output_path: &std::path::Path,
718 ) -> Result<PassStats>
719 where
720 D: Demuxer,
721 {
722 let streams: Vec<StreamInfo> = demuxer.streams().to_vec();
724
725 if streams.is_empty() {
726 return Err(TranscodeError::ContainerError(
727 "Input container has no streams".to_string(),
728 ));
729 }
730
731 let audio_stream_indices: Vec<usize> = streams
733 .iter()
734 .filter(|s| s.is_audio())
735 .map(|s| s.index)
736 .collect();
737
738 if let Some(ref vc) = self.config.video_codec {
750 if let Some(intra_id) = parse_intra_codec(vc) {
751 let quality = self
752 .config
753 .quality
754 .as_ref()
755 .and_then(|q| {
756 if let RateControlMode::Crf(v) = q.rate_control {
757 Some(v)
758 } else {
759 None
760 }
761 })
762 .unwrap_or(INTRA_DEFAULT_QUALITY);
763
764 let params = intra_encoder_params(&streams, quality)?;
765 let _encoder = make_video_encoder(intra_id, ¶ms)?;
768 info!(
769 "Intra-codec {} encoder ready: {}×{} quality={}",
770 vc, params.width, params.height, quality
771 );
772 } else {
773 debug!("Video codec override requested: {} (stream-copy path)", vc);
774 }
775 }
776 if let Some(ref ac) = self.config.audio_codec {
777 debug!("Audio codec override requested: {} (stream-copy path)", ac);
778 }
779 if self.normalization_gain_db.abs() > 0.01 {
780 info!(
781 "Normalization gain {:.2} dB will be applied to {} audio stream(s)",
782 self.normalization_gain_db,
783 audio_stream_indices.len()
784 );
785 }
786
787 let mux_config = MuxerConfig::new().with_writing_app("OxiMedia-Transcode");
788
789 let stats = match out_format {
790 ContainerFormat::Matroska => {
791 let sink = FileSource::create(output_path)
792 .await
793 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
794 let mut muxer = MatroskaMuxer::new(sink, mux_config);
795 for stream in &streams {
796 muxer
797 .add_stream(stream.clone())
798 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
799 }
800 muxer
801 .write_header()
802 .await
803 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
804
805 let stats = drain_packets_with_gain(
806 demuxer,
807 &mut muxer,
808 &self.progress_tracker,
809 &audio_stream_indices,
810 self.normalization_gain_db,
811 )
812 .await?;
813
814 muxer
815 .write_trailer()
816 .await
817 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
818
819 stats
820 }
821 ContainerFormat::Ogg => {
822 let sink = FileSource::create(output_path)
823 .await
824 .map_err(|e| TranscodeError::IoError(e.to_string()))?;
825 let mut muxer = OggMuxer::new(sink, mux_config);
826 for stream in &streams {
827 muxer
828 .add_stream(stream.clone())
829 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
830 }
831 muxer
832 .write_header()
833 .await
834 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
835
836 let stats = drain_packets_with_gain(
837 demuxer,
838 &mut muxer,
839 &self.progress_tracker,
840 &audio_stream_indices,
841 self.normalization_gain_db,
842 )
843 .await?;
844
845 muxer
846 .write_trailer()
847 .await
848 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
849
850 stats
851 }
852 other => {
853 return Err(TranscodeError::ContainerError(format!(
854 "Unsupported output container format: {:?}",
855 other
856 )));
857 }
858 };
859
860 Ok(stats)
861 }
862
863 async fn verify_output(&self) -> Result<TranscodeOutput> {
871 let output_path = &self.config.output;
872
873 #[cfg(not(target_arch = "wasm32"))]
874 let metadata = tokio::fs::metadata(output_path).await.map_err(|e| {
875 TranscodeError::IoError(format!(
876 "Output file '{}' not found or unreadable: {}",
877 output_path.display(),
878 e
879 ))
880 })?;
881 #[cfg(target_arch = "wasm32")]
882 let metadata = std::fs::metadata(output_path).map_err(|e| {
883 TranscodeError::IoError(format!(
884 "Output file '{}' not found or unreadable: {}",
885 output_path.display(),
886 e
887 ))
888 })?;
889
890 let file_size = metadata.len();
891 if file_size == 0 {
892 return Err(TranscodeError::PipelineError(
893 "Output file is empty — transcode may have failed".to_string(),
894 ));
895 }
896
897 let encoding_time = match self.encode_start {
898 Some(t) => t.elapsed().as_secs_f64(),
899 None => 0.0,
900 };
901
902 let duration_approx = derive_duration_approx(file_size);
906
907 let speed_factor = if encoding_time > 0.0 && duration_approx > 0.0 {
908 duration_approx / encoding_time
909 } else {
910 1.0
911 };
912
913 let total_frames =
916 self.accumulated_stats.video_frames + self.accumulated_stats.audio_frames;
917 let video_bitrate_approx = if duration_approx > 0.0
918 && self.accumulated_stats.video_frames > 0
919 && total_frames > 0
920 {
921 let video_fraction = self.accumulated_stats.video_frames as f64 / total_frames as f64;
922 ((self.accumulated_stats.bytes_out as f64 * video_fraction * 8.0) / duration_approx)
923 as u64
924 } else {
925 0u64
926 };
927 let audio_bitrate_approx = if duration_approx > 0.0
928 && self.accumulated_stats.audio_frames > 0
929 && total_frames > 0
930 {
931 let audio_fraction = self.accumulated_stats.audio_frames as f64 / total_frames as f64;
932 ((self.accumulated_stats.bytes_out as f64 * audio_fraction * 8.0) / duration_approx)
933 as u64
934 } else {
935 0u64
936 };
937
938 info!(
939 "Transcode complete: {} video frames, {} audio frames, \
940 {} bytes in → {} bytes out, encoding time {:.2}s, speed {:.2}×",
941 self.accumulated_stats.video_frames,
942 self.accumulated_stats.audio_frames,
943 self.accumulated_stats.bytes_in,
944 self.accumulated_stats.bytes_out,
945 encoding_time,
946 speed_factor
947 );
948
949 Ok(TranscodeOutput {
950 output_path: output_path
951 .to_str()
952 .map(String::from)
953 .unwrap_or_else(|| output_path.display().to_string()),
954 file_size,
955 duration: duration_approx,
956 video_bitrate: video_bitrate_approx,
957 audio_bitrate: audio_bitrate_approx,
958 encoding_time,
959 speed_factor,
960 })
961 }
962}
963
964async fn drain_packets_with_gain<D, M>(
976 demuxer: &mut D,
977 muxer: &mut M,
978 _progress: &Option<ProgressTracker>,
979 audio_stream_indices: &[usize],
980 gain_db: f64,
981) -> Result<PassStats>
982where
983 D: Demuxer,
984 M: Muxer,
985{
986 let mut stats = PassStats::default();
987 let gain_linear = 10f64.powf(gain_db / 20.0) as f32;
989 let apply_gain = gain_db.abs() > 0.01 && !audio_stream_indices.is_empty();
990
991 loop {
992 match demuxer.read_packet().await {
993 Ok(mut pkt) => {
994 if pkt.should_discard() {
995 continue;
996 }
997
998 let raw_len = pkt.data.len() as u64;
999 stats.bytes_in += raw_len;
1000
1001 if audio_stream_indices.contains(&pkt.stream_index) {
1002 if apply_gain {
1004 pkt.data = apply_i16_gain(pkt.data, gain_linear);
1005 }
1006 stats.audio_frames += 1;
1007 } else {
1008 stats.video_frames += 1;
1009 }
1010
1011 let out_len = pkt.data.len() as u64;
1012 stats.bytes_out += out_len;
1013
1014 muxer
1015 .write_packet(&pkt)
1016 .await
1017 .map_err(|e| TranscodeError::ContainerError(e.to_string()))?;
1018
1019 let total = stats.video_frames + stats.audio_frames;
1020 if total % 500 == 0 {
1021 debug!(
1022 "Remuxed {} packets ({} video, {} audio)",
1023 total, stats.video_frames, stats.audio_frames
1024 );
1025 }
1026 }
1027 Err(e) if e.is_eof() => break,
1028 Err(e) => {
1029 return Err(TranscodeError::ContainerError(format!(
1030 "Error reading packet: {}",
1031 e
1032 )));
1033 }
1034 }
1035 }
1036
1037 debug!(
1038 "drain_packets_with_gain: {} video frames, {} audio frames, \
1039 {} bytes in, {} bytes out",
1040 stats.video_frames, stats.audio_frames, stats.bytes_in, stats.bytes_out
1041 );
1042 Ok(stats)
1043}
1044
1045fn apply_i16_gain(data: bytes::Bytes, gain_linear: f32) -> bytes::Bytes {
1051 if (gain_linear - 1.0).abs() < f32::EPSILON {
1052 return data;
1053 }
1054 let mut buf: Vec<u8> = data.into();
1055 let n_samples = buf.len() / 2;
1056 for i in 0..n_samples {
1057 let lo = buf[i * 2];
1058 let hi = buf[i * 2 + 1];
1059 let sample = i16::from_le_bytes([lo, hi]) as f32;
1060 let gained = (sample * gain_linear).clamp(i16::MIN as f32, i16::MAX as f32) as i16;
1061 let out = gained.to_le_bytes();
1062 buf[i * 2] = out[0];
1063 buf[i * 2 + 1] = out[1];
1064 }
1065 bytes::Bytes::from(buf)
1066}
1067
1068async fn count_packets<D: Demuxer>(demuxer: &mut D) -> u64 {
1070 let mut count: u64 = 0;
1071 loop {
1072 match demuxer.read_packet().await {
1073 Ok(_) => count += 1,
1074 Err(e) if e.is_eof() => break,
1075 Err(_) => break,
1076 }
1077 }
1078 count
1079}
1080
1081async fn feed_audio_packets_to_meter<D: Demuxer>(demuxer: &mut D, meter: &mut LoudnessMeter) {
1087 let audio_stream_indices: Vec<usize> = demuxer
1088 .streams()
1089 .iter()
1090 .filter(|s| s.is_audio())
1091 .map(|s| s.index)
1092 .collect();
1093
1094 loop {
1095 match demuxer.read_packet().await {
1096 Ok(pkt) => {
1097 if !audio_stream_indices.contains(&pkt.stream_index) {
1098 continue;
1099 }
1100 let samples = bytes_as_f32_samples(&pkt.data);
1102 if !samples.is_empty() {
1103 meter.process_f32(&samples);
1104 }
1105 }
1106 Err(e) if e.is_eof() => break,
1107 Err(_) => break,
1108 }
1109 }
1110}
1111
1112fn bytes_as_f32_samples(data: &[u8]) -> Vec<f32> {
1115 let n_samples = data.len() / 4;
1116 let mut out = Vec::with_capacity(n_samples);
1117 for i in 0..n_samples {
1118 let base = i * 4;
1119 let raw = u32::from_le_bytes([data[base], data[base + 1], data[base + 2], data[base + 3]]);
1120 out.push(f32::from_bits(raw));
1121 }
1122 out
1123}
1124
1125fn derive_duration_approx(file_size: u64) -> f64 {
1131 const BYTES_PER_SECOND: f64 = 625_000.0;
1133 file_size as f64 / BYTES_PER_SECOND
1134}
1135
1136pub struct TranscodePipeline {
1140 config: PipelineConfig,
1141}
1142
1143impl TranscodePipeline {
1144 #[must_use]
1146 pub fn builder() -> TranscodePipelineBuilder {
1147 TranscodePipelineBuilder::new()
1148 }
1149
1150 pub fn set_video_codec(&mut self, codec: &str) {
1152 self.config.video_codec = Some(codec.to_string());
1153 }
1154
1155 pub fn set_audio_codec(&mut self, codec: &str) {
1157 self.config.audio_codec = Some(codec.to_string());
1158 }
1159
1160 pub async fn execute(&mut self) -> crate::Result<TranscodeOutput> {
1166 let mut pipeline = Pipeline::new(self.config.clone());
1167 pipeline.execute().await
1168 }
1169}
1170
1171pub struct TranscodePipelineBuilder {
1175 input: Option<PathBuf>,
1176 output: Option<PathBuf>,
1177 video_codec: Option<String>,
1178 audio_codec: Option<String>,
1179 quality: Option<QualityConfig>,
1180 multipass: Option<MultiPassMode>,
1181 normalization: Option<NormalizationConfig>,
1182 track_progress: bool,
1183 hw_accel: bool,
1184}
1185
1186impl TranscodePipelineBuilder {
1187 #[must_use]
1189 pub fn new() -> Self {
1190 Self {
1191 input: None,
1192 output: None,
1193 video_codec: None,
1194 audio_codec: None,
1195 quality: None,
1196 multipass: None,
1197 normalization: None,
1198 track_progress: false,
1199 hw_accel: true,
1200 }
1201 }
1202
1203 #[must_use]
1205 pub fn input(mut self, path: impl Into<PathBuf>) -> Self {
1206 self.input = Some(path.into());
1207 self
1208 }
1209
1210 #[must_use]
1212 pub fn output(mut self, path: impl Into<PathBuf>) -> Self {
1213 self.output = Some(path.into());
1214 self
1215 }
1216
1217 #[must_use]
1219 pub fn video_codec(mut self, codec: impl Into<String>) -> Self {
1220 self.video_codec = Some(codec.into());
1221 self
1222 }
1223
1224 #[must_use]
1226 pub fn audio_codec(mut self, codec: impl Into<String>) -> Self {
1227 self.audio_codec = Some(codec.into());
1228 self
1229 }
1230
1231 #[must_use]
1233 pub fn quality(mut self, quality: QualityConfig) -> Self {
1234 self.quality = Some(quality);
1235 self
1236 }
1237
1238 #[must_use]
1240 pub fn multipass(mut self, mode: MultiPassMode) -> Self {
1241 self.multipass = Some(mode);
1242 self
1243 }
1244
1245 #[must_use]
1247 pub fn normalization(mut self, config: NormalizationConfig) -> Self {
1248 self.normalization = Some(config);
1249 self
1250 }
1251
1252 #[must_use]
1254 pub fn track_progress(mut self, enable: bool) -> Self {
1255 self.track_progress = enable;
1256 self
1257 }
1258
1259 #[must_use]
1261 pub fn hw_accel(mut self, enable: bool) -> Self {
1262 self.hw_accel = enable;
1263 self
1264 }
1265
1266 pub fn build(self) -> crate::Result<TranscodePipeline> {
1272 let input = self
1273 .input
1274 .ok_or_else(|| TranscodeError::InvalidInput("Input path not specified".to_string()))?;
1275
1276 let output = self.output.ok_or_else(|| {
1277 TranscodeError::InvalidOutput("Output path not specified".to_string())
1278 })?;
1279
1280 let multipass_config = self.multipass.map(|mode| {
1281 MultiPassConfig::new(
1282 mode,
1283 std::env::temp_dir().join("oximedia-transcode-stats.log"),
1284 )
1285 });
1286
1287 Ok(TranscodePipeline {
1288 config: PipelineConfig {
1289 input,
1290 output,
1291 video_codec: self.video_codec,
1292 audio_codec: self.audio_codec,
1293 quality: self.quality,
1294 multipass: multipass_config,
1295 normalization: self.normalization,
1296 track_progress: self.track_progress,
1297 hw_accel: self.hw_accel,
1298 },
1299 })
1300 }
1301}
1302
1303impl Default for TranscodePipelineBuilder {
1304 fn default() -> Self {
1305 Self::new()
1306 }
1307}
1308
1309#[cfg(test)]
1312mod tests {
1313 use super::*;
1314
1315 fn tmp_in() -> PathBuf {
1316 std::env::temp_dir().join("oximedia-transcode-pipeline-input.mkv")
1317 }
1318
1319 fn tmp_out() -> PathBuf {
1320 std::env::temp_dir().join("oximedia-transcode-pipeline-output.mkv")
1321 }
1322
1323 #[test]
1324 fn test_pipeline_builder() {
1325 let ti = tmp_in();
1326 let to = tmp_out();
1327 let result = TranscodePipelineBuilder::new()
1328 .input(ti.clone())
1329 .output(to.clone())
1330 .video_codec("vp9")
1331 .audio_codec("opus")
1332 .track_progress(true)
1333 .hw_accel(false)
1334 .build();
1335
1336 assert!(result.is_ok());
1337 let pipeline = result.expect("should succeed in test");
1338 assert_eq!(pipeline.config.input, ti);
1339 assert_eq!(pipeline.config.output, to);
1340 assert_eq!(pipeline.config.video_codec, Some("vp9".to_string()));
1341 assert_eq!(pipeline.config.audio_codec, Some("opus".to_string()));
1342 assert!(pipeline.config.track_progress);
1343 assert!(!pipeline.config.hw_accel);
1344 }
1345
1346 #[test]
1347 fn test_pipeline_builder_missing_input() {
1348 let result = TranscodePipelineBuilder::new().output(tmp_out()).build();
1349 assert!(result.is_err());
1350 }
1351
1352 #[test]
1353 fn test_pipeline_builder_missing_output() {
1354 let result = TranscodePipelineBuilder::new().input(tmp_in()).build();
1355 assert!(result.is_err());
1356 }
1357
1358 #[test]
1359 fn test_pipeline_stage_flow() {
1360 let config = PipelineConfig {
1361 input: tmp_in(),
1362 output: tmp_out(),
1363 video_codec: None,
1364 audio_codec: None,
1365 quality: None,
1366 multipass: None,
1367 normalization: None,
1368 track_progress: false,
1369 hw_accel: true,
1370 };
1371
1372 let pipeline = Pipeline::new(config);
1373 assert!(matches!(
1374 pipeline.current_stage(),
1375 PipelineStage::Validation
1376 ));
1377 }
1378
1379 #[test]
1380 fn test_output_format_from_path() {
1381 assert!(matches!(
1382 output_format_from_path(std::path::Path::new("out.mkv")),
1383 ContainerFormat::Matroska
1384 ));
1385 assert!(matches!(
1386 output_format_from_path(std::path::Path::new("out.webm")),
1387 ContainerFormat::Matroska
1388 ));
1389 assert!(matches!(
1390 output_format_from_path(std::path::Path::new("out.ogg")),
1391 ContainerFormat::Ogg
1392 ));
1393 }
1394
1395 #[test]
1396 fn test_bytes_as_f32_samples_empty() {
1397 let samples = bytes_as_f32_samples(&[]);
1398 assert!(samples.is_empty());
1399 }
1400
1401 #[test]
1402 fn test_bytes_as_f32_samples_partial() {
1403 let data = [0u8; 7];
1405 let samples = bytes_as_f32_samples(&data);
1406 assert_eq!(samples.len(), 1);
1407 }
1408
1409 #[test]
1410 fn test_bytes_as_f32_known_value() {
1411 let data = [0x00u8, 0x00, 0x00, 0x00];
1413 let samples = bytes_as_f32_samples(&data);
1414 assert_eq!(samples.len(), 1);
1415 assert_eq!(samples[0], 0.0f32);
1416 }
1417
1418 #[test]
1421 fn test_apply_i16_gain_unity() {
1422 let sample: i16 = 1234;
1424 let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1425 let out = apply_i16_gain(raw.clone(), 1.0);
1426 assert_eq!(&out[..], &raw[..]);
1427 }
1428
1429 #[test]
1430 fn test_apply_i16_gain_double() {
1431 let sample: i16 = 1000;
1433 let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1434 let out = apply_i16_gain(raw, 2.0);
1435 let result = i16::from_le_bytes([out[0], out[1]]);
1436 assert_eq!(result, 2000);
1437 }
1438
1439 #[test]
1440 fn test_apply_i16_gain_clamp_positive() {
1441 let sample: i16 = i16::MAX;
1443 let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1444 let out = apply_i16_gain(raw, 2.0);
1445 let result = i16::from_le_bytes([out[0], out[1]]);
1446 assert_eq!(result, i16::MAX);
1447 }
1448
1449 #[test]
1450 fn test_apply_i16_gain_clamp_negative() {
1451 let sample: i16 = i16::MIN;
1453 let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1454 let out = apply_i16_gain(raw, 2.0);
1455 let result = i16::from_le_bytes([out[0], out[1]]);
1456 assert_eq!(result, i16::MIN);
1457 }
1458
1459 #[test]
1460 fn test_apply_i16_gain_half() {
1461 let sample: i16 = 2000;
1463 let raw = bytes::Bytes::from(sample.to_le_bytes().to_vec());
1464 let out = apply_i16_gain(raw, 0.5);
1465 let result = i16::from_le_bytes([out[0], out[1]]);
1466 assert_eq!(result, 1000);
1467 }
1468
1469 #[test]
1470 fn test_apply_i16_gain_odd_byte_length() {
1471 let raw = bytes::Bytes::from(vec![0xFFu8, 0x7F, 0xAB]); let out = apply_i16_gain(raw, 2.0);
1474 let result = i16::from_le_bytes([out[0], out[1]]);
1476 assert_eq!(result, i16::MAX);
1477 assert_eq!(out[2], 0xAB);
1479 }
1480
1481 #[test]
1484 fn test_pass_stats_default() {
1485 let stats = PassStats::default();
1486 assert_eq!(stats.bytes_in, 0);
1487 assert_eq!(stats.bytes_out, 0);
1488 assert_eq!(stats.video_frames, 0);
1489 assert_eq!(stats.audio_frames, 0);
1490 }
1491
1492 #[tokio::test]
1500 async fn test_pipeline_execute_remux_produces_output() {
1501 use oximedia_container::{
1502 mux::{MatroskaMuxer, MuxerConfig},
1503 Muxer, Packet, PacketFlags, StreamInfo,
1504 };
1505 use oximedia_core::{CodecId, Rational, Timestamp};
1506 use oximedia_io::MemorySource;
1507
1508 let in_buf = MemorySource::new_writable(64 * 1024);
1510 let mut muxer = MatroskaMuxer::new(in_buf, MuxerConfig::new());
1511
1512 let mut video = StreamInfo::new(0, CodecId::Vp9, Rational::new(1, 1000));
1513 video.codec_params.width = Some(320);
1514 video.codec_params.height = Some(240);
1515 muxer.add_stream(video).expect("add stream");
1516 muxer.write_header().await.expect("write header");
1517
1518 for i in 0u64..30 {
1520 let data = vec![0x42u8, 0x00, (i & 0xFF) as u8, 0x01];
1521 let pkt = Packet::new(
1522 0,
1523 bytes::Bytes::from(data),
1524 Timestamp::new(i as i64 * 33, Rational::new(1, 1000)),
1525 PacketFlags::KEYFRAME,
1526 );
1527 muxer.write_packet(&pkt).await.expect("write packet");
1528 }
1529 muxer.write_trailer().await.expect("write trailer");
1530
1531 let tmp_dir = std::env::temp_dir();
1533 let input_path = tmp_dir.join("pipeline_test_input.mkv");
1534 let output_path = tmp_dir.join("pipeline_test_output.mkv");
1535
1536 let sink = muxer.into_sink();
1537 let mkv_bytes = sink.written_data().to_vec();
1538 tokio::fs::write(&input_path, &mkv_bytes)
1539 .await
1540 .expect("write temp input");
1541
1542 let mut pipeline = TranscodePipelineBuilder::new()
1544 .input(input_path.clone())
1545 .output(output_path.clone())
1546 .build()
1547 .expect("build pipeline");
1548
1549 let result = pipeline.execute().await;
1550
1551 let _ = tokio::fs::remove_file(&input_path).await;
1553 let _ = tokio::fs::remove_file(&output_path).await;
1554
1555 let output = result.expect("pipeline execute should succeed");
1556
1557 assert!(
1559 output.file_size > 0,
1560 "output file size must be > 0, got {}",
1561 output.file_size
1562 );
1563 assert!(
1564 output.encoding_time >= 0.0,
1565 "encoding time must be non-negative"
1566 );
1567 }
1568
1569 #[tokio::test]
1572 async fn test_pipeline_execute_with_normalization_gain() {
1573 use crate::{LoudnessStandard, NormalizationConfig};
1574 use oximedia_container::{
1575 mux::{MatroskaMuxer, MuxerConfig},
1576 Muxer, Packet, PacketFlags, StreamInfo,
1577 };
1578 use oximedia_core::{CodecId, Rational, Timestamp};
1579 use oximedia_io::MemorySource;
1580
1581 let in_buf = MemorySource::new_writable(64 * 1024);
1583 let mut muxer = MatroskaMuxer::new(in_buf, MuxerConfig::new());
1584
1585 let mut audio = StreamInfo::new(0, CodecId::Opus, Rational::new(1, 48000));
1586 audio.codec_params.sample_rate = Some(48000);
1587 audio.codec_params.channels = Some(2);
1588 muxer.add_stream(audio).expect("add audio stream");
1589 muxer.write_header().await.expect("write header");
1590
1591 for i in 0u64..20 {
1593 let sample_le: i16 = 100;
1595 let mut data = Vec::with_capacity(32);
1596 for _ in 0..16 {
1597 data.extend_from_slice(&sample_le.to_le_bytes());
1598 }
1599 let pkt = Packet::new(
1600 0,
1601 bytes::Bytes::from(data),
1602 Timestamp::new(i as i64 * 960, Rational::new(1, 48000)),
1603 PacketFlags::KEYFRAME,
1604 );
1605 muxer.write_packet(&pkt).await.expect("write audio packet");
1606 }
1607 muxer.write_trailer().await.expect("write trailer");
1608
1609 let tmp_dir = std::env::temp_dir();
1610 let input_path = tmp_dir.join("pipeline_norm_input.mkv");
1611 let output_path = tmp_dir.join("pipeline_norm_output.mkv");
1612
1613 let sink = muxer.into_sink();
1614 let mkv_bytes = sink.written_data().to_vec();
1615 tokio::fs::write(&input_path, &mkv_bytes)
1616 .await
1617 .expect("write temp input");
1618
1619 let norm_config = NormalizationConfig::new(LoudnessStandard::EbuR128);
1621 let pipeline_config = PipelineConfig {
1623 input: input_path.clone(),
1624 output: output_path.clone(),
1625 video_codec: None,
1626 audio_codec: None,
1627 quality: None,
1628 multipass: None,
1629 normalization: Some(norm_config),
1630 track_progress: false,
1631 hw_accel: false,
1632 };
1633 let mut pipeline_inner = Pipeline::new(pipeline_config);
1634 pipeline_inner.normalization_gain_db = 6.0206;
1636 pipeline_inner.encode_start = Some(std::time::Instant::now());
1637 pipeline_inner.current_stage = PipelineStage::Encode;
1638
1639 let pass_stats = pipeline_inner.execute_single_pass().await;
1640
1641 let _ = tokio::fs::remove_file(&input_path).await;
1642 let _ = tokio::fs::remove_file(&output_path).await;
1643
1644 let stats = pass_stats.expect("single-pass should succeed");
1645 assert!(
1646 stats.audio_frames > 0,
1647 "must have processed at least one audio frame"
1648 );
1649 assert!(
1650 stats.bytes_in > 0,
1651 "must have read at least some bytes from input"
1652 );
1653 assert!(
1654 stats.bytes_out > 0,
1655 "must have written at least some bytes to output"
1656 );
1657 }
1658}