Skip to main content

oximedia_transcode/
parallel.rs

1//! Parallel encoding for multiple outputs simultaneously.
2
3use crate::{Result, TranscodeConfig, TranscodeError, TranscodeOutput};
4use rayon::prelude::*;
5use std::sync::{Arc, Mutex};
6
7/// Configuration for parallel encoding.
8#[derive(Debug, Clone)]
9pub struct ParallelConfig {
10    /// Maximum number of parallel encodes.
11    pub max_parallel: usize,
12    /// CPU cores to use per encode.
13    pub cores_per_encode: Option<usize>,
14    /// Whether to use thread pools.
15    pub use_thread_pool: bool,
16    /// Priority for parallel jobs.
17    pub priority: ParallelPriority,
18}
19
20/// Priority levels for parallel jobs.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ParallelPriority {
23    /// Low priority (background processing).
24    Low,
25    /// Normal priority.
26    Normal,
27    /// High priority (time-sensitive).
28    High,
29}
30
31impl Default for ParallelConfig {
32    fn default() -> Self {
33        Self {
34            max_parallel: num_cpus(),
35            cores_per_encode: None,
36            use_thread_pool: true,
37            priority: ParallelPriority::Normal,
38        }
39    }
40}
41
42impl ParallelConfig {
43    /// Creates a new parallel config with automatic core detection.
44    #[must_use]
45    pub fn auto() -> Self {
46        Self::default()
47    }
48
49    /// Creates a config with a specific number of parallel jobs.
50    #[must_use]
51    pub fn with_max_parallel(max: usize) -> Self {
52        Self {
53            max_parallel: max,
54            ..Self::default()
55        }
56    }
57
58    /// Sets the number of cores per encode job.
59    #[must_use]
60    pub fn cores_per_encode(mut self, cores: usize) -> Self {
61        self.cores_per_encode = Some(cores);
62        self
63    }
64
65    /// Sets the priority level.
66    #[must_use]
67    pub fn priority(mut self, priority: ParallelPriority) -> Self {
68        self.priority = priority;
69        self
70    }
71
72    /// Validates the configuration.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the configuration is invalid.
77    pub fn validate(&self) -> Result<()> {
78        if self.max_parallel == 0 {
79            return Err(TranscodeError::ValidationError(
80                crate::ValidationError::Unsupported(
81                    "max_parallel must be greater than 0".to_string(),
82                ),
83            ));
84        }
85
86        if let Some(cores) = self.cores_per_encode {
87            if cores == 0 {
88                return Err(TranscodeError::ValidationError(
89                    crate::ValidationError::Unsupported(
90                        "cores_per_encode must be greater than 0".to_string(),
91                    ),
92                ));
93            }
94        }
95
96        Ok(())
97    }
98}
99
100/// Gets the number of CPU cores available.
101///
102/// Falls back to 4 if the system query fails.
103fn num_cpus() -> usize {
104    std::thread::available_parallelism()
105        .map(std::num::NonZero::get)
106        .unwrap_or(4) // unwrap_or is safe — this is a fallback, not unwrap()
107}
108
109/// Parallel encoder for processing multiple outputs simultaneously.
110pub struct ParallelEncoder {
111    config: ParallelConfig,
112    jobs: Vec<TranscodeConfig>,
113    results: Arc<Mutex<Vec<Result<TranscodeOutput>>>>,
114}
115
116impl ParallelEncoder {
117    /// Creates a new parallel encoder with the given configuration.
118    #[must_use]
119    pub fn new(config: ParallelConfig) -> Self {
120        Self {
121            config,
122            jobs: Vec::new(),
123            results: Arc::new(Mutex::new(Vec::new())),
124        }
125    }
126
127    /// Adds a job to the parallel encoder.
128    pub fn add_job(&mut self, job: TranscodeConfig) {
129        self.jobs.push(job);
130    }
131
132    /// Adds multiple jobs at once.
133    pub fn add_jobs(&mut self, jobs: Vec<TranscodeConfig>) {
134        self.jobs.extend(jobs);
135    }
136
137    /// Gets the number of jobs queued.
138    #[must_use]
139    pub fn job_count(&self) -> usize {
140        self.jobs.len()
141    }
142
143    /// Executes all jobs in parallel.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the configuration is invalid. Individual job errors
148    /// are captured in the results.
149    pub async fn execute_all(&mut self) -> Result<Vec<Result<TranscodeOutput>>> {
150        self.config.validate()?;
151
152        // Configure thread pool
153        let pool = rayon::ThreadPoolBuilder::new()
154            .num_threads(self.config.max_parallel)
155            .build()
156            .map_err(|e| {
157                TranscodeError::PipelineError(format!("Failed to create thread pool: {e}"))
158            })?;
159
160        let jobs = std::mem::take(&mut self.jobs);
161
162        // Execute jobs in parallel and collect results directly.
163        let job_results: Vec<Result<TranscodeOutput>> = pool.install(|| {
164            jobs.into_par_iter()
165                .map(Self::execute_job)
166                .collect::<Vec<_>>()
167        });
168
169        // Store results for later retrieval.
170        match self.results.lock() {
171            Ok(mut guard) => {
172                guard.extend(job_results.iter().cloned());
173            }
174            Err(poisoned) => {
175                poisoned.into_inner().extend(job_results.iter().cloned());
176            }
177        }
178
179        Ok(job_results)
180    }
181
182    /// Executes all jobs sequentially (for debugging).
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if any job fails.
187    pub async fn execute_sequential(&mut self) -> Result<Vec<TranscodeOutput>> {
188        let mut outputs = Vec::new();
189
190        for job in &self.jobs {
191            let output = Self::execute_job(job.clone())?;
192            outputs.push(output);
193        }
194
195        Ok(outputs)
196    }
197
198    /// Executes a single transcode job synchronously.
199    ///
200    /// Validates the job configuration and then delegates to the
201    /// pipeline builder for actual transcoding. The pipeline is
202    /// executed on a per-thread tokio runtime so that async I/O
203    /// works within the rayon thread pool.
204    fn execute_job(job: TranscodeConfig) -> Result<TranscodeOutput> {
205        let input = job
206            .input
207            .as_deref()
208            .ok_or_else(|| TranscodeError::InvalidInput("No input file specified".to_string()))?;
209
210        let output = job
211            .output
212            .as_deref()
213            .ok_or_else(|| TranscodeError::InvalidOutput("No output file specified".to_string()))?;
214
215        // Build a pipeline from the job config.
216        let mut pipeline_builder = crate::pipeline::TranscodePipelineBuilder::new()
217            .input(input)
218            .output(output);
219
220        if let Some(ref vc) = job.video_codec {
221            pipeline_builder = pipeline_builder.video_codec(vc);
222        }
223        if let Some(ref ac) = job.audio_codec {
224            pipeline_builder = pipeline_builder.audio_codec(ac);
225        }
226        if let Some(mode) = job.multi_pass {
227            pipeline_builder = pipeline_builder.multipass(mode);
228        }
229
230        let mut pipeline = pipeline_builder.build()?;
231
232        // Create a per-thread tokio runtime to drive the async pipeline.
233        let rt = tokio::runtime::Builder::new_current_thread()
234            .enable_all()
235            .build()
236            .map_err(|e| {
237                TranscodeError::PipelineError(format!("Failed to create async runtime: {e}"))
238            })?;
239
240        rt.block_on(pipeline.execute())
241    }
242
243    /// Gets the results of completed jobs.
244    #[must_use]
245    pub fn get_results(&self) -> Vec<Result<TranscodeOutput>> {
246        match self.results.lock() {
247            Ok(guard) => guard.clone(),
248            Err(poisoned) => poisoned.into_inner().clone(),
249        }
250    }
251
252    /// Clears all jobs and results.
253    pub fn clear(&mut self) {
254        self.jobs.clear();
255        match self.results.lock() {
256            Ok(mut guard) => guard.clear(),
257            Err(poisoned) => poisoned.into_inner().clear(),
258        }
259    }
260}
261
262/// Builder for creating parallel encode jobs.
263pub struct ParallelEncodeBuilder {
264    config: ParallelConfig,
265    jobs: Vec<TranscodeConfig>,
266}
267
268impl ParallelEncodeBuilder {
269    /// Creates a new parallel encode builder.
270    #[must_use]
271    pub fn new() -> Self {
272        Self {
273            config: ParallelConfig::default(),
274            jobs: Vec::new(),
275        }
276    }
277
278    /// Sets the maximum number of parallel jobs.
279    #[must_use]
280    pub fn max_parallel(mut self, max: usize) -> Self {
281        self.config.max_parallel = max;
282        self
283    }
284
285    /// Sets cores per encode job.
286    #[must_use]
287    pub fn cores_per_encode(mut self, cores: usize) -> Self {
288        self.config.cores_per_encode = Some(cores);
289        self
290    }
291
292    /// Sets the priority level.
293    #[must_use]
294    pub fn priority(mut self, priority: ParallelPriority) -> Self {
295        self.config.priority = priority;
296        self
297    }
298
299    /// Adds a job to the builder.
300    #[must_use]
301    pub fn add_job(mut self, job: TranscodeConfig) -> Self {
302        self.jobs.push(job);
303        self
304    }
305
306    /// Adds multiple jobs.
307    #[must_use]
308    pub fn add_jobs(mut self, jobs: Vec<TranscodeConfig>) -> Self {
309        self.jobs.extend(jobs);
310        self
311    }
312
313    /// Builds the parallel encoder.
314    #[must_use]
315    pub fn build(self) -> ParallelEncoder {
316        let mut encoder = ParallelEncoder::new(self.config);
317        encoder.add_jobs(self.jobs);
318        encoder
319    }
320}
321
322impl Default for ParallelEncodeBuilder {
323    fn default() -> Self {
324        Self::new()
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn test_parallel_config_default() {
334        let config = ParallelConfig::default();
335        assert!(config.max_parallel > 0);
336        assert_eq!(config.priority, ParallelPriority::Normal);
337        assert!(config.use_thread_pool);
338    }
339
340    #[test]
341    fn test_parallel_config_validation() {
342        let valid = ParallelConfig::with_max_parallel(4);
343        assert!(valid.validate().is_ok());
344
345        let invalid = ParallelConfig {
346            max_parallel: 0,
347            ..Default::default()
348        };
349        assert!(invalid.validate().is_err());
350    }
351
352    #[test]
353    fn test_parallel_config_cores_validation() {
354        let valid = ParallelConfig::default().cores_per_encode(2);
355        assert!(valid.validate().is_ok());
356
357        let invalid = ParallelConfig::default().cores_per_encode(0);
358        assert!(invalid.validate().is_err());
359    }
360
361    #[test]
362    fn test_parallel_encoder_job_count() {
363        let mut encoder = ParallelEncoder::new(ParallelConfig::default());
364        assert_eq!(encoder.job_count(), 0);
365
366        let job = TranscodeConfig {
367            input: Some("/tmp/input.mp4".to_string()),
368            output: Some("/tmp/output.mp4".to_string()),
369            ..Default::default()
370        };
371
372        encoder.add_job(job);
373        assert_eq!(encoder.job_count(), 1);
374    }
375
376    #[test]
377    fn test_parallel_encoder_add_jobs() {
378        let mut encoder = ParallelEncoder::new(ParallelConfig::default());
379
380        let jobs = vec![
381            TranscodeConfig {
382                input: Some("/tmp/input1.mp4".to_string()),
383                output: Some("/tmp/output1.mp4".to_string()),
384                ..Default::default()
385            },
386            TranscodeConfig {
387                input: Some("/tmp/input2.mp4".to_string()),
388                output: Some("/tmp/output2.mp4".to_string()),
389                ..Default::default()
390            },
391        ];
392
393        encoder.add_jobs(jobs);
394        assert_eq!(encoder.job_count(), 2);
395    }
396
397    #[test]
398    fn test_parallel_encoder_clear() {
399        let mut encoder = ParallelEncoder::new(ParallelConfig::default());
400
401        let job = TranscodeConfig {
402            input: Some("/tmp/input.mp4".to_string()),
403            output: Some("/tmp/output.mp4".to_string()),
404            ..Default::default()
405        };
406
407        encoder.add_job(job);
408        assert_eq!(encoder.job_count(), 1);
409
410        encoder.clear();
411        assert_eq!(encoder.job_count(), 0);
412    }
413
414    #[test]
415    fn test_parallel_builder() {
416        let job = TranscodeConfig {
417            input: Some("/tmp/input.mp4".to_string()),
418            output: Some("/tmp/output.mp4".to_string()),
419            ..Default::default()
420        };
421
422        let encoder = ParallelEncodeBuilder::new()
423            .max_parallel(4)
424            .cores_per_encode(2)
425            .priority(ParallelPriority::High)
426            .add_job(job)
427            .build();
428
429        assert_eq!(encoder.config.max_parallel, 4);
430        assert_eq!(encoder.config.cores_per_encode, Some(2));
431        assert_eq!(encoder.config.priority, ParallelPriority::High);
432        assert_eq!(encoder.job_count(), 1);
433    }
434
435    #[test]
436    fn test_num_cpus() {
437        let cpus = num_cpus();
438        assert!(cpus > 0);
439        assert!(cpus <= 1024); // Reasonable upper bound
440    }
441}