Skip to main content

oximedia_transcode/
pipeline.rs

1//! Transcoding pipeline orchestration and execution.
2
3use crate::{
4    MultiPassConfig, MultiPassEncoder, MultiPassMode, NormalizationConfig, ProgressTracker,
5    QualityConfig, Result, TranscodeError, TranscodeOutput,
6};
7use std::path::PathBuf;
8
9/// Pipeline stage in the transcoding workflow.
10#[derive(Debug, Clone)]
11pub enum PipelineStage {
12    /// Input validation stage.
13    Validation,
14    /// Audio analysis stage (for normalization).
15    AudioAnalysis,
16    /// First pass encoding stage (analysis).
17    FirstPass,
18    /// Second pass encoding stage (final).
19    SecondPass,
20    /// Third pass encoding stage (optional).
21    ThirdPass,
22    /// Final encoding stage.
23    Encode,
24    /// Output verification stage.
25    Verification,
26}
27
28/// Transcoding pipeline configuration.
29#[derive(Debug, Clone)]
30pub struct PipelineConfig {
31    /// Input file path.
32    pub input: PathBuf,
33    /// Output file path.
34    pub output: PathBuf,
35    /// Video codec name.
36    pub video_codec: Option<String>,
37    /// Audio codec name.
38    pub audio_codec: Option<String>,
39    /// Quality configuration.
40    pub quality: Option<QualityConfig>,
41    /// Multi-pass configuration.
42    pub multipass: Option<MultiPassConfig>,
43    /// Normalization configuration.
44    pub normalization: Option<NormalizationConfig>,
45    /// Enable progress tracking.
46    pub track_progress: bool,
47    /// Enable hardware acceleration.
48    pub hw_accel: bool,
49}
50
51/// Transcoding pipeline orchestrator.
52pub struct Pipeline {
53    config: PipelineConfig,
54    current_stage: PipelineStage,
55    progress_tracker: Option<ProgressTracker>,
56}
57
58impl Pipeline {
59    /// Creates a new pipeline with the given configuration.
60    #[must_use]
61    pub fn new(config: PipelineConfig) -> Self {
62        Self {
63            config,
64            current_stage: PipelineStage::Validation,
65            progress_tracker: None,
66        }
67    }
68
69    /// Sets the progress tracker.
70    pub fn set_progress_tracker(&mut self, tracker: ProgressTracker) {
71        self.progress_tracker = Some(tracker);
72    }
73
74    /// Executes the pipeline.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if any pipeline stage fails.
79    pub async fn execute(&mut self) -> Result<TranscodeOutput> {
80        // Validation stage
81        self.current_stage = PipelineStage::Validation;
82        self.validate()?;
83
84        // Audio analysis (if normalization enabled)
85        if self.config.normalization.is_some() {
86            self.current_stage = PipelineStage::AudioAnalysis;
87            self.analyze_audio().await?;
88        }
89
90        // Multi-pass encoding
91        if let Some(multipass_config) = &self.config.multipass {
92            let mut encoder = MultiPassEncoder::new(multipass_config.clone());
93
94            while encoder.has_more_passes() {
95                let pass = encoder.current_pass();
96                self.current_stage = match pass {
97                    1 => PipelineStage::FirstPass,
98                    2 => PipelineStage::SecondPass,
99                    _ => PipelineStage::ThirdPass,
100                };
101
102                self.execute_pass(pass, &encoder).await?;
103                encoder.next_pass();
104            }
105
106            // Cleanup statistics files
107            encoder.cleanup()?;
108        } else {
109            // Single-pass encoding
110            self.current_stage = PipelineStage::Encode;
111            self.execute_single_pass().await?;
112        }
113
114        // Verification
115        self.current_stage = PipelineStage::Verification;
116        self.verify_output().await
117    }
118
119    /// Gets the current pipeline stage.
120    #[must_use]
121    pub fn current_stage(&self) -> &PipelineStage {
122        &self.current_stage
123    }
124
125    fn validate(&self) -> Result<()> {
126        use crate::validation::{InputValidator, OutputValidator};
127
128        // Validate input
129        InputValidator::validate_path(
130            self.config
131                .input
132                .to_str()
133                .ok_or_else(|| TranscodeError::InvalidInput("Invalid input path".to_string()))?,
134        )?;
135
136        // Validate output
137        OutputValidator::validate_path(
138            self.config
139                .output
140                .to_str()
141                .ok_or_else(|| TranscodeError::InvalidOutput("Invalid output path".to_string()))?,
142            true,
143        )?;
144
145        Ok(())
146    }
147
148    async fn analyze_audio(&self) -> Result<()> {
149        // Placeholder for audio analysis
150        // In a real implementation, this would:
151        // 1. Scan the audio track
152        // 2. Measure loudness (LUFS)
153        // 3. Calculate required gain
154        Ok(())
155    }
156
157    async fn execute_pass(&self, _pass: u32, _encoder: &MultiPassEncoder) -> Result<()> {
158        // Placeholder for pass execution
159        // In a real implementation, this would:
160        // 1. Configure encoder for the specific pass
161        // 2. Process the input file
162        // 3. Generate statistics or output
163        Ok(())
164    }
165
166    async fn execute_single_pass(&self) -> Result<()> {
167        // Placeholder for single-pass execution
168        // In a real implementation, this would:
169        // 1. Configure encoder
170        // 2. Process the input file
171        // 3. Write output
172        Ok(())
173    }
174
175    async fn verify_output(&self) -> Result<TranscodeOutput> {
176        // Placeholder for output verification
177        // In a real implementation, this would:
178        // 1. Check output file exists
179        // 2. Verify it's playable
180        // 3. Collect statistics
181
182        Ok(TranscodeOutput {
183            output_path: self.config.output.to_str().unwrap_or("unknown").to_string(),
184            file_size: 0,
185            duration: 0.0,
186            video_bitrate: 0,
187            audio_bitrate: 0,
188            encoding_time: 0.0,
189            speed_factor: 1.0,
190        })
191    }
192}
193
194/// Builder for transcoding pipelines.
195pub struct TranscodePipeline {
196    config: PipelineConfig,
197}
198
199impl TranscodePipeline {
200    /// Creates a new pipeline builder.
201    #[must_use]
202    pub fn builder() -> TranscodePipelineBuilder {
203        TranscodePipelineBuilder::new()
204    }
205
206    /// Sets the video codec.
207    pub fn set_video_codec(&mut self, codec: &str) {
208        self.config.video_codec = Some(codec.to_string());
209    }
210
211    /// Sets the audio codec.
212    pub fn set_audio_codec(&mut self, codec: &str) {
213        self.config.audio_codec = Some(codec.to_string());
214    }
215
216    /// Executes the pipeline.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the pipeline execution fails.
221    pub async fn execute(&mut self) -> Result<TranscodeOutput> {
222        let mut pipeline = Pipeline::new(self.config.clone());
223        pipeline.execute().await
224    }
225}
226
227/// Builder for creating transcoding pipelines.
228pub struct TranscodePipelineBuilder {
229    input: Option<PathBuf>,
230    output: Option<PathBuf>,
231    video_codec: Option<String>,
232    audio_codec: Option<String>,
233    quality: Option<QualityConfig>,
234    multipass: Option<MultiPassMode>,
235    normalization: Option<NormalizationConfig>,
236    track_progress: bool,
237    hw_accel: bool,
238}
239
240impl TranscodePipelineBuilder {
241    /// Creates a new pipeline builder.
242    #[must_use]
243    pub fn new() -> Self {
244        Self {
245            input: None,
246            output: None,
247            video_codec: None,
248            audio_codec: None,
249            quality: None,
250            multipass: None,
251            normalization: None,
252            track_progress: false,
253            hw_accel: true,
254        }
255    }
256
257    /// Sets the input file.
258    #[must_use]
259    pub fn input(mut self, path: impl Into<PathBuf>) -> Self {
260        self.input = Some(path.into());
261        self
262    }
263
264    /// Sets the output file.
265    #[must_use]
266    pub fn output(mut self, path: impl Into<PathBuf>) -> Self {
267        self.output = Some(path.into());
268        self
269    }
270
271    /// Sets the video codec.
272    #[must_use]
273    pub fn video_codec(mut self, codec: impl Into<String>) -> Self {
274        self.video_codec = Some(codec.into());
275        self
276    }
277
278    /// Sets the audio codec.
279    #[must_use]
280    pub fn audio_codec(mut self, codec: impl Into<String>) -> Self {
281        self.audio_codec = Some(codec.into());
282        self
283    }
284
285    /// Sets the quality configuration.
286    #[must_use]
287    pub fn quality(mut self, quality: QualityConfig) -> Self {
288        self.quality = Some(quality);
289        self
290    }
291
292    /// Sets the multi-pass mode.
293    #[must_use]
294    pub fn multipass(mut self, mode: MultiPassMode) -> Self {
295        self.multipass = Some(mode);
296        self
297    }
298
299    /// Sets the normalization configuration.
300    #[must_use]
301    pub fn normalization(mut self, config: NormalizationConfig) -> Self {
302        self.normalization = Some(config);
303        self
304    }
305
306    /// Enables progress tracking.
307    #[must_use]
308    pub fn track_progress(mut self, enable: bool) -> Self {
309        self.track_progress = enable;
310        self
311    }
312
313    /// Enables hardware acceleration.
314    #[must_use]
315    pub fn hw_accel(mut self, enable: bool) -> Self {
316        self.hw_accel = enable;
317        self
318    }
319
320    /// Builds the transcoding pipeline.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if required fields are missing.
325    pub fn build(self) -> Result<TranscodePipeline> {
326        let input = self
327            .input
328            .ok_or_else(|| TranscodeError::InvalidInput("Input path not specified".to_string()))?;
329
330        let output = self.output.ok_or_else(|| {
331            TranscodeError::InvalidOutput("Output path not specified".to_string())
332        })?;
333
334        let multipass_config = self
335            .multipass
336            .map(|mode| MultiPassConfig::new(mode, "/tmp/transcode_stats.log"));
337
338        Ok(TranscodePipeline {
339            config: PipelineConfig {
340                input,
341                output,
342                video_codec: self.video_codec,
343                audio_codec: self.audio_codec,
344                quality: self.quality,
345                multipass: multipass_config,
346                normalization: self.normalization,
347                track_progress: self.track_progress,
348                hw_accel: self.hw_accel,
349            },
350        })
351    }
352}
353
354impl Default for TranscodePipelineBuilder {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    #[test]
365    fn test_pipeline_builder() {
366        let result = TranscodePipelineBuilder::new()
367            .input("/tmp/input.mp4")
368            .output("/tmp/output.mp4")
369            .video_codec("vp9")
370            .audio_codec("opus")
371            .track_progress(true)
372            .hw_accel(false)
373            .build();
374
375        assert!(result.is_ok());
376        let pipeline = result.expect("should succeed in test");
377        assert_eq!(pipeline.config.input, PathBuf::from("/tmp/input.mp4"));
378        assert_eq!(pipeline.config.output, PathBuf::from("/tmp/output.mp4"));
379        assert_eq!(pipeline.config.video_codec, Some("vp9".to_string()));
380        assert_eq!(pipeline.config.audio_codec, Some("opus".to_string()));
381        assert!(pipeline.config.track_progress);
382        assert!(!pipeline.config.hw_accel);
383    }
384
385    #[test]
386    fn test_pipeline_builder_missing_input() {
387        let result = TranscodePipelineBuilder::new()
388            .output("/tmp/output.mp4")
389            .build();
390
391        assert!(result.is_err());
392    }
393
394    #[test]
395    fn test_pipeline_builder_missing_output() {
396        let result = TranscodePipelineBuilder::new()
397            .input("/tmp/input.mp4")
398            .build();
399
400        assert!(result.is_err());
401    }
402
403    #[test]
404    fn test_pipeline_stage_flow() {
405        let config = PipelineConfig {
406            input: PathBuf::from("/tmp/input.mp4"),
407            output: PathBuf::from("/tmp/output.mp4"),
408            video_codec: None,
409            audio_codec: None,
410            quality: None,
411            multipass: None,
412            normalization: None,
413            track_progress: false,
414            hw_accel: true,
415        };
416
417        let pipeline = Pipeline::new(config);
418        assert!(matches!(
419            pipeline.current_stage(),
420            PipelineStage::Validation
421        ));
422    }
423}