oximedia_transcode/
parallel.rs1use crate::{Result, TranscodeConfig, TranscodeError, TranscodeOutput};
4use rayon::prelude::*;
5use std::sync::{Arc, Mutex};
6
7#[derive(Debug, Clone)]
9pub struct ParallelConfig {
10 pub max_parallel: usize,
12 pub cores_per_encode: Option<usize>,
14 pub use_thread_pool: bool,
16 pub priority: ParallelPriority,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ParallelPriority {
23 Low,
25 Normal,
27 High,
29}
30
31impl Default for ParallelConfig {
32 fn default() -> Self {
33 Self {
34 max_parallel: num_cpus(),
35 cores_per_encode: None,
36 use_thread_pool: true,
37 priority: ParallelPriority::Normal,
38 }
39 }
40}
41
42impl ParallelConfig {
43 #[must_use]
45 pub fn auto() -> Self {
46 Self::default()
47 }
48
49 #[must_use]
51 pub fn with_max_parallel(max: usize) -> Self {
52 Self {
53 max_parallel: max,
54 ..Self::default()
55 }
56 }
57
58 #[must_use]
60 pub fn cores_per_encode(mut self, cores: usize) -> Self {
61 self.cores_per_encode = Some(cores);
62 self
63 }
64
65 #[must_use]
67 pub fn priority(mut self, priority: ParallelPriority) -> Self {
68 self.priority = priority;
69 self
70 }
71
72 pub fn validate(&self) -> Result<()> {
78 if self.max_parallel == 0 {
79 return Err(TranscodeError::ValidationError(
80 crate::ValidationError::Unsupported(
81 "max_parallel must be greater than 0".to_string(),
82 ),
83 ));
84 }
85
86 if let Some(cores) = self.cores_per_encode {
87 if cores == 0 {
88 return Err(TranscodeError::ValidationError(
89 crate::ValidationError::Unsupported(
90 "cores_per_encode must be greater than 0".to_string(),
91 ),
92 ));
93 }
94 }
95
96 Ok(())
97 }
98}
99
100fn num_cpus() -> usize {
104 std::thread::available_parallelism()
105 .map(std::num::NonZero::get)
106 .unwrap_or(4) }
108
109pub struct ParallelEncoder {
111 config: ParallelConfig,
112 jobs: Vec<TranscodeConfig>,
113 results: Arc<Mutex<Vec<Result<TranscodeOutput>>>>,
114}
115
116impl ParallelEncoder {
117 #[must_use]
119 pub fn new(config: ParallelConfig) -> Self {
120 Self {
121 config,
122 jobs: Vec::new(),
123 results: Arc::new(Mutex::new(Vec::new())),
124 }
125 }
126
127 pub fn add_job(&mut self, job: TranscodeConfig) {
129 self.jobs.push(job);
130 }
131
132 pub fn add_jobs(&mut self, jobs: Vec<TranscodeConfig>) {
134 self.jobs.extend(jobs);
135 }
136
137 #[must_use]
139 pub fn job_count(&self) -> usize {
140 self.jobs.len()
141 }
142
143 pub async fn execute_all(&mut self) -> Result<Vec<Result<TranscodeOutput>>> {
150 self.config.validate()?;
151
152 let pool = rayon::ThreadPoolBuilder::new()
154 .num_threads(self.config.max_parallel)
155 .build()
156 .map_err(|e| {
157 TranscodeError::PipelineError(format!("Failed to create thread pool: {e}"))
158 })?;
159
160 let jobs = std::mem::take(&mut self.jobs);
161
162 let job_results: Vec<Result<TranscodeOutput>> = pool.install(|| {
164 jobs.into_par_iter()
165 .map(Self::execute_job)
166 .collect::<Vec<_>>()
167 });
168
169 match self.results.lock() {
171 Ok(mut guard) => {
172 guard.extend(job_results.iter().cloned());
173 }
174 Err(poisoned) => {
175 poisoned.into_inner().extend(job_results.iter().cloned());
176 }
177 }
178
179 Ok(job_results)
180 }
181
182 pub async fn execute_sequential(&mut self) -> Result<Vec<TranscodeOutput>> {
188 let mut outputs = Vec::new();
189
190 for job in &self.jobs {
191 let output = Self::execute_job(job.clone())?;
192 outputs.push(output);
193 }
194
195 Ok(outputs)
196 }
197
198 fn execute_job(job: TranscodeConfig) -> Result<TranscodeOutput> {
205 let input = job
206 .input
207 .as_deref()
208 .ok_or_else(|| TranscodeError::InvalidInput("No input file specified".to_string()))?;
209
210 let output = job
211 .output
212 .as_deref()
213 .ok_or_else(|| TranscodeError::InvalidOutput("No output file specified".to_string()))?;
214
215 let mut pipeline_builder = crate::pipeline::TranscodePipelineBuilder::new()
217 .input(input)
218 .output(output);
219
220 if let Some(ref vc) = job.video_codec {
221 pipeline_builder = pipeline_builder.video_codec(vc);
222 }
223 if let Some(ref ac) = job.audio_codec {
224 pipeline_builder = pipeline_builder.audio_codec(ac);
225 }
226 if let Some(mode) = job.multi_pass {
227 pipeline_builder = pipeline_builder.multipass(mode);
228 }
229
230 let mut pipeline = pipeline_builder.build()?;
231
232 let rt = tokio::runtime::Builder::new_current_thread()
234 .enable_all()
235 .build()
236 .map_err(|e| {
237 TranscodeError::PipelineError(format!("Failed to create async runtime: {e}"))
238 })?;
239
240 rt.block_on(pipeline.execute())
241 }
242
243 #[must_use]
245 pub fn get_results(&self) -> Vec<Result<TranscodeOutput>> {
246 match self.results.lock() {
247 Ok(guard) => guard.clone(),
248 Err(poisoned) => poisoned.into_inner().clone(),
249 }
250 }
251
252 pub fn clear(&mut self) {
254 self.jobs.clear();
255 match self.results.lock() {
256 Ok(mut guard) => guard.clear(),
257 Err(poisoned) => poisoned.into_inner().clear(),
258 }
259 }
260}
261
262pub struct ParallelEncodeBuilder {
264 config: ParallelConfig,
265 jobs: Vec<TranscodeConfig>,
266}
267
268impl ParallelEncodeBuilder {
269 #[must_use]
271 pub fn new() -> Self {
272 Self {
273 config: ParallelConfig::default(),
274 jobs: Vec::new(),
275 }
276 }
277
278 #[must_use]
280 pub fn max_parallel(mut self, max: usize) -> Self {
281 self.config.max_parallel = max;
282 self
283 }
284
285 #[must_use]
287 pub fn cores_per_encode(mut self, cores: usize) -> Self {
288 self.config.cores_per_encode = Some(cores);
289 self
290 }
291
292 #[must_use]
294 pub fn priority(mut self, priority: ParallelPriority) -> Self {
295 self.config.priority = priority;
296 self
297 }
298
299 #[must_use]
301 pub fn add_job(mut self, job: TranscodeConfig) -> Self {
302 self.jobs.push(job);
303 self
304 }
305
306 #[must_use]
308 pub fn add_jobs(mut self, jobs: Vec<TranscodeConfig>) -> Self {
309 self.jobs.extend(jobs);
310 self
311 }
312
313 #[must_use]
315 pub fn build(self) -> ParallelEncoder {
316 let mut encoder = ParallelEncoder::new(self.config);
317 encoder.add_jobs(self.jobs);
318 encoder
319 }
320}
321
322impl Default for ParallelEncodeBuilder {
323 fn default() -> Self {
324 Self::new()
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[test]
333 fn test_parallel_config_default() {
334 let config = ParallelConfig::default();
335 assert!(config.max_parallel > 0);
336 assert_eq!(config.priority, ParallelPriority::Normal);
337 assert!(config.use_thread_pool);
338 }
339
340 #[test]
341 fn test_parallel_config_validation() {
342 let valid = ParallelConfig::with_max_parallel(4);
343 assert!(valid.validate().is_ok());
344
345 let invalid = ParallelConfig {
346 max_parallel: 0,
347 ..Default::default()
348 };
349 assert!(invalid.validate().is_err());
350 }
351
352 #[test]
353 fn test_parallel_config_cores_validation() {
354 let valid = ParallelConfig::default().cores_per_encode(2);
355 assert!(valid.validate().is_ok());
356
357 let invalid = ParallelConfig::default().cores_per_encode(0);
358 assert!(invalid.validate().is_err());
359 }
360
361 #[test]
362 fn test_parallel_encoder_job_count() {
363 let mut encoder = ParallelEncoder::new(ParallelConfig::default());
364 assert_eq!(encoder.job_count(), 0);
365
366 let job = TranscodeConfig {
367 input: Some("/tmp/input.mp4".to_string()),
368 output: Some("/tmp/output.mp4".to_string()),
369 ..Default::default()
370 };
371
372 encoder.add_job(job);
373 assert_eq!(encoder.job_count(), 1);
374 }
375
376 #[test]
377 fn test_parallel_encoder_add_jobs() {
378 let mut encoder = ParallelEncoder::new(ParallelConfig::default());
379
380 let jobs = vec![
381 TranscodeConfig {
382 input: Some("/tmp/input1.mp4".to_string()),
383 output: Some("/tmp/output1.mp4".to_string()),
384 ..Default::default()
385 },
386 TranscodeConfig {
387 input: Some("/tmp/input2.mp4".to_string()),
388 output: Some("/tmp/output2.mp4".to_string()),
389 ..Default::default()
390 },
391 ];
392
393 encoder.add_jobs(jobs);
394 assert_eq!(encoder.job_count(), 2);
395 }
396
397 #[test]
398 fn test_parallel_encoder_clear() {
399 let mut encoder = ParallelEncoder::new(ParallelConfig::default());
400
401 let job = TranscodeConfig {
402 input: Some("/tmp/input.mp4".to_string()),
403 output: Some("/tmp/output.mp4".to_string()),
404 ..Default::default()
405 };
406
407 encoder.add_job(job);
408 assert_eq!(encoder.job_count(), 1);
409
410 encoder.clear();
411 assert_eq!(encoder.job_count(), 0);
412 }
413
414 #[test]
415 fn test_parallel_builder() {
416 let job = TranscodeConfig {
417 input: Some("/tmp/input.mp4".to_string()),
418 output: Some("/tmp/output.mp4".to_string()),
419 ..Default::default()
420 };
421
422 let encoder = ParallelEncodeBuilder::new()
423 .max_parallel(4)
424 .cores_per_encode(2)
425 .priority(ParallelPriority::High)
426 .add_job(job)
427 .build();
428
429 assert_eq!(encoder.config.max_parallel, 4);
430 assert_eq!(encoder.config.cores_per_encode, Some(2));
431 assert_eq!(encoder.config.priority, ParallelPriority::High);
432 assert_eq!(encoder.job_count(), 1);
433 }
434
435 #[test]
436 fn test_num_cpus() {
437 let cpus = num_cpus();
438 assert!(cpus > 0);
439 assert!(cpus <= 1024); }
441}