Skip to main content

oximedia_transcode/
parallel.rs

1//! Parallel encoding for multiple outputs simultaneously.
2
3use crate::{Result, TranscodeConfig, TranscodeError, TranscodeOutput};
4use rayon::prelude::*;
5use std::sync::{Arc, Mutex};
6
7/// Configuration for parallel encoding.
8#[derive(Debug, Clone)]
9pub struct ParallelConfig {
10    /// Maximum number of parallel encodes.
11    pub max_parallel: usize,
12    /// CPU cores to use per encode.
13    pub cores_per_encode: Option<usize>,
14    /// Whether to use thread pools.
15    pub use_thread_pool: bool,
16    /// Priority for parallel jobs.
17    pub priority: ParallelPriority,
18}
19
20/// Priority levels for parallel jobs.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ParallelPriority {
23    /// Low priority (background processing).
24    Low,
25    /// Normal priority.
26    Normal,
27    /// High priority (time-sensitive).
28    High,
29}
30
31impl Default for ParallelConfig {
32    fn default() -> Self {
33        Self {
34            max_parallel: num_cpus(),
35            cores_per_encode: None,
36            use_thread_pool: true,
37            priority: ParallelPriority::Normal,
38        }
39    }
40}
41
42impl ParallelConfig {
43    /// Creates a new parallel config with automatic core detection.
44    #[must_use]
45    pub fn auto() -> Self {
46        Self::default()
47    }
48
49    /// Creates a config with a specific number of parallel jobs.
50    #[must_use]
51    pub fn with_max_parallel(max: usize) -> Self {
52        Self {
53            max_parallel: max,
54            ..Self::default()
55        }
56    }
57
58    /// Sets the number of cores per encode job.
59    #[must_use]
60    pub fn cores_per_encode(mut self, cores: usize) -> Self {
61        self.cores_per_encode = Some(cores);
62        self
63    }
64
65    /// Sets the priority level.
66    #[must_use]
67    pub fn priority(mut self, priority: ParallelPriority) -> Self {
68        self.priority = priority;
69        self
70    }
71
72    /// Validates the configuration.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the configuration is invalid.
77    pub fn validate(&self) -> Result<()> {
78        if self.max_parallel == 0 {
79            return Err(TranscodeError::ValidationError(
80                crate::ValidationError::Unsupported(
81                    "max_parallel must be greater than 0".to_string(),
82                ),
83            ));
84        }
85
86        if let Some(cores) = self.cores_per_encode {
87            if cores == 0 {
88                return Err(TranscodeError::ValidationError(
89                    crate::ValidationError::Unsupported(
90                        "cores_per_encode must be greater than 0".to_string(),
91                    ),
92                ));
93            }
94        }
95
96        Ok(())
97    }
98}
99
100/// Gets the number of CPU cores available.
101///
102/// Falls back to 4 if the system query fails.
103fn num_cpus() -> usize {
104    std::thread::available_parallelism()
105        .map(std::num::NonZero::get)
106        .unwrap_or(4) // unwrap_or is safe — this is a fallback, not unwrap()
107}
108
109/// Parallel encoder for processing multiple outputs simultaneously.
110pub struct ParallelEncoder {
111    config: ParallelConfig,
112    jobs: Vec<TranscodeConfig>,
113    results: Arc<Mutex<Vec<Result<TranscodeOutput>>>>,
114}
115
116impl ParallelEncoder {
117    /// Creates a new parallel encoder with the given configuration.
118    #[must_use]
119    pub fn new(config: ParallelConfig) -> Self {
120        Self {
121            config,
122            jobs: Vec::new(),
123            results: Arc::new(Mutex::new(Vec::new())),
124        }
125    }
126
127    /// Adds a job to the parallel encoder.
128    pub fn add_job(&mut self, job: TranscodeConfig) {
129        self.jobs.push(job);
130    }
131
132    /// Adds multiple jobs at once.
133    pub fn add_jobs(&mut self, jobs: Vec<TranscodeConfig>) {
134        self.jobs.extend(jobs);
135    }
136
137    /// Gets the number of jobs queued.
138    #[must_use]
139    pub fn job_count(&self) -> usize {
140        self.jobs.len()
141    }
142
143    /// Executes all jobs in parallel.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the configuration is invalid. Individual job errors
148    /// are captured in the results.
149    pub async fn execute_all(&mut self) -> Result<Vec<Result<TranscodeOutput>>> {
150        self.config.validate()?;
151
152        // Configure thread pool
153        let pool = rayon::ThreadPoolBuilder::new()
154            .num_threads(self.config.max_parallel)
155            .build()
156            .map_err(|e| {
157                TranscodeError::PipelineError(format!("Failed to create thread pool: {e}"))
158            })?;
159
160        let jobs = std::mem::take(&mut self.jobs);
161
162        // Execute jobs in parallel and collect results directly.
163        let job_results: Vec<Result<TranscodeOutput>> = pool.install(|| {
164            jobs.into_par_iter()
165                .map(Self::execute_job)
166                .collect::<Vec<_>>()
167        });
168
169        // Store results for later retrieval.
170        match self.results.lock() {
171            Ok(mut guard) => {
172                guard.extend(job_results.iter().cloned());
173            }
174            Err(poisoned) => {
175                poisoned.into_inner().extend(job_results.iter().cloned());
176            }
177        }
178
179        Ok(job_results)
180    }
181
182    /// Executes all jobs sequentially (for debugging).
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if any job fails.
187    pub async fn execute_sequential(&mut self) -> Result<Vec<TranscodeOutput>> {
188        let mut outputs = Vec::new();
189
190        for job in &self.jobs {
191            let output = Self::execute_job(job.clone())?;
192            outputs.push(output);
193        }
194
195        Ok(outputs)
196    }
197
198    /// Executes a single transcode job synchronously.
199    ///
200    /// Validates the job configuration and then delegates to the
201    /// pipeline builder for actual transcoding. The pipeline is
202    /// executed on a per-thread tokio runtime so that async I/O
203    /// works within the rayon thread pool.
204    #[cfg(not(target_arch = "wasm32"))]
205    fn execute_job(job: TranscodeConfig) -> Result<TranscodeOutput> {
206        let input = job
207            .input
208            .as_deref()
209            .ok_or_else(|| TranscodeError::InvalidInput("No input file specified".to_string()))?;
210
211        let output = job
212            .output
213            .as_deref()
214            .ok_or_else(|| TranscodeError::InvalidOutput("No output file specified".to_string()))?;
215
216        // Build a pipeline from the job config.
217        let mut pipeline_builder = crate::pipeline::TranscodePipelineBuilder::new()
218            .input(input)
219            .output(output);
220
221        if let Some(ref vc) = job.video_codec {
222            pipeline_builder = pipeline_builder.video_codec(vc);
223        }
224        if let Some(ref ac) = job.audio_codec {
225            pipeline_builder = pipeline_builder.audio_codec(ac);
226        }
227        if let Some(mode) = job.multi_pass {
228            pipeline_builder = pipeline_builder.multipass(mode);
229        }
230
231        let mut pipeline = pipeline_builder.build()?;
232
233        // Create a per-thread tokio runtime to drive the async pipeline.
234        let rt = tokio::runtime::Builder::new_current_thread()
235            .enable_all()
236            .build()
237            .map_err(|e| {
238                TranscodeError::PipelineError(format!("Failed to create async runtime: {e}"))
239            })?;
240
241        rt.block_on(pipeline.execute())
242    }
243
244    /// Executes a single transcode job synchronously (wasm32 stub).
245    #[cfg(target_arch = "wasm32")]
246    fn execute_job(_job: TranscodeConfig) -> Result<TranscodeOutput> {
247        Err(TranscodeError::Unsupported(
248            "Parallel job execution is not supported on wasm32".to_string(),
249        ))
250    }
251
252    /// Gets the results of completed jobs.
253    #[must_use]
254    pub fn get_results(&self) -> Vec<Result<TranscodeOutput>> {
255        match self.results.lock() {
256            Ok(guard) => guard.clone(),
257            Err(poisoned) => poisoned.into_inner().clone(),
258        }
259    }
260
261    /// Clears all jobs and results.
262    pub fn clear(&mut self) {
263        self.jobs.clear();
264        match self.results.lock() {
265            Ok(mut guard) => guard.clear(),
266            Err(poisoned) => poisoned.into_inner().clear(),
267        }
268    }
269}
270
271// ─── AV1 tile-based parallel encoding ────────────────────────────────────────
272
273/// Configuration for AV1 tile-based parallel encoding.
274///
275/// AV1 supports dividing each frame into an `M × N` grid of independently
276/// encodable tiles.  Encoding multiple tiles in parallel with rayon reduces
277/// wall-clock time on multi-core systems while producing a bitstream that is
278/// backward-compatible with single-threaded decoders.
279///
280/// `tile_cols` and `tile_rows` are specified as **log2** values following the
281/// AV1 specification (e.g. 2 → 4 tile columns).  Use [`Av1TileConfig::auto`]
282/// to pick sensible defaults for a given resolution.
283#[derive(Debug, Clone, PartialEq, Eq)]
284pub struct Av1TileConfig {
285    /// Log2 number of tile columns (0–6).
286    pub tile_cols_log2: u8,
287    /// Log2 number of tile rows (0–6).
288    pub tile_rows_log2: u8,
289    /// Number of rayon worker threads (0 = auto).
290    pub threads: usize,
291    /// Enable row-based multi-threading within each tile.
292    pub row_mt: bool,
293}
294
295impl Default for Av1TileConfig {
296    fn default() -> Self {
297        Self {
298            tile_cols_log2: 1, // 2 columns
299            tile_rows_log2: 1, // 2 rows
300            threads: 0,
301            row_mt: true,
302        }
303    }
304}
305
306impl Av1TileConfig {
307    /// Creates a new AV1 tile configuration with explicit log2 tile counts.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error when `tile_cols_log2` or `tile_rows_log2` exceeds 6.
312    pub fn new(tile_cols_log2: u8, tile_rows_log2: u8, threads: usize) -> Result<Self> {
313        if tile_cols_log2 > 6 {
314            return Err(TranscodeError::ValidationError(
315                crate::ValidationError::Unsupported(format!(
316                    "tile_cols_log2 must be 0–6, got {tile_cols_log2}"
317                )),
318            ));
319        }
320        if tile_rows_log2 > 6 {
321            return Err(TranscodeError::ValidationError(
322                crate::ValidationError::Unsupported(format!(
323                    "tile_rows_log2 must be 0–6, got {tile_rows_log2}"
324                )),
325            ));
326        }
327        Ok(Self {
328            tile_cols_log2,
329            tile_rows_log2,
330            threads,
331            row_mt: true,
332        })
333    }
334
335    /// Automatically selects tile counts appropriate for the given resolution.
336    ///
337    /// | Resolution    | tile_cols_log2 | tile_rows_log2 |
338    /// |---------------|----------------|----------------|
339    /// | ≤ 720p        | 1 (2 cols)     | 0 (1 row)      |
340    /// | 1080p         | 1 (2 cols)     | 1 (2 rows)     |
341    /// | 4K (2160p)    | 2 (4 cols)     | 2 (4 rows)     |
342    /// | 8K (4320p)    | 3 (8 cols)     | 2 (4 rows)     |
343    #[must_use]
344    pub fn auto(_width: u32, height: u32, threads: usize) -> Self {
345        let (cols_log2, rows_log2) = if height <= 720 {
346            (1, 0)
347        } else if height <= 1080 {
348            (1, 1)
349        } else if height <= 2160 {
350            (2, 2)
351        } else {
352            (3, 2)
353        };
354        Self {
355            tile_cols_log2: cols_log2,
356            tile_rows_log2: rows_log2,
357            threads,
358            row_mt: true,
359        }
360    }
361
362    /// Returns the actual number of tile columns (2^tile_cols_log2).
363    #[must_use]
364    pub fn tile_cols(&self) -> u32 {
365        1u32 << self.tile_cols_log2
366    }
367
368    /// Returns the actual number of tile rows (2^tile_rows_log2).
369    #[must_use]
370    pub fn tile_rows(&self) -> u32 {
371        1u32 << self.tile_rows_log2
372    }
373
374    /// Returns the total number of tiles per frame.
375    #[must_use]
376    pub fn total_tiles(&self) -> u32 {
377        self.tile_cols() * self.tile_rows()
378    }
379
380    /// Validates the configuration against frame dimensions.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the tile grid would produce tiles smaller than
385    /// 64×64 pixels (the AV1 minimum superblock size).
386    pub fn validate_for_frame(&self, width: u32, height: u32) -> Result<()> {
387        const MIN_TILE_DIM: u32 = 64;
388        let tile_w = width / self.tile_cols();
389        let tile_h = height / self.tile_rows();
390        if tile_w < MIN_TILE_DIM || tile_h < MIN_TILE_DIM {
391            return Err(TranscodeError::ValidationError(
392                crate::ValidationError::Unsupported(format!(
393                    "Tile grid {}×{} produces tiles {}×{} which is smaller than \
394                     the AV1 minimum {}×{} pixels",
395                    self.tile_cols(),
396                    self.tile_rows(),
397                    tile_w,
398                    tile_h,
399                    MIN_TILE_DIM,
400                    MIN_TILE_DIM
401                )),
402            ));
403        }
404        Ok(())
405    }
406}
407
408/// Statistics produced by a single AV1 tile encode pass.
409#[derive(Debug, Clone, Default)]
410pub struct Av1TileStats {
411    /// Total number of tiles encoded.
412    pub tiles_encoded: u32,
413    /// Total compressed bytes produced across all tiles.
414    pub compressed_bytes: u64,
415    /// Wall-clock time in seconds.
416    pub wall_time_secs: f64,
417}
418
419impl Av1TileStats {
420    /// Estimated throughput in tiles per second.
421    #[must_use]
422    pub fn tiles_per_second(&self) -> f64 {
423        if self.wall_time_secs > 0.0 {
424            f64::from(self.tiles_encoded) / self.wall_time_secs
425        } else {
426            0.0
427        }
428    }
429
430    /// Average compressed bytes per tile.
431    #[must_use]
432    pub fn avg_bytes_per_tile(&self) -> u64 {
433        if self.tiles_encoded == 0 {
434            return 0;
435        }
436        self.compressed_bytes / u64::from(self.tiles_encoded)
437    }
438}
439
440/// Encoder that splits video frames into tiles and encodes each tile in
441/// parallel using rayon.
442///
443/// The encoder works with raw RGBA or YUV420 frame data (stored as a flat
444/// `Vec<u8>`).  Each tile is extracted from the source buffer, passed to
445/// the configurable `Av1TileEncodeOp` trait, and the resulting bitstreams
446/// are assembled into a single frame bitstream.
447pub struct Av1TileParallelEncoder {
448    tile_config: Av1TileConfig,
449    frame_width: u32,
450    frame_height: u32,
451    stats: Av1TileStats,
452}
453
454impl Av1TileParallelEncoder {
455    /// Creates a new tile encoder for frames of `frame_width × frame_height`.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error when the tile configuration is incompatible with the
460    /// frame dimensions.
461    pub fn new(tile_config: Av1TileConfig, frame_width: u32, frame_height: u32) -> Result<Self> {
462        tile_config.validate_for_frame(frame_width, frame_height)?;
463        Ok(Self {
464            tile_config,
465            frame_width,
466            frame_height,
467            stats: Av1TileStats::default(),
468        })
469    }
470
471    /// Returns the tile configuration.
472    #[must_use]
473    pub fn tile_config(&self) -> &Av1TileConfig {
474        &self.tile_config
475    }
476
477    /// Returns collected encoding statistics.
478    #[must_use]
479    pub fn stats(&self) -> &Av1TileStats {
480        &self.stats
481    }
482
483    /// Encodes one RGBA frame using tile-based rayon parallelism.
484    ///
485    /// The frame data must be a row-major RGBA buffer of exactly
486    /// `frame_width * frame_height * 4` bytes.
487    ///
488    /// # Returns
489    ///
490    /// An assembled tile bitstream (simple concatenation with 4-byte LE length
491    /// prefix per tile, as produced by [`assemble_av1_tile_bitstream`]).
492    ///
493    /// # Errors
494    ///
495    /// Returns an error when the frame buffer is too small.
496    pub fn encode_frame_rgba(&mut self, rgba: &[u8]) -> Result<Vec<u8>> {
497        let expected = (self.frame_width * self.frame_height * 4) as usize;
498        if rgba.len() < expected {
499            return Err(TranscodeError::CodecError(format!(
500                "RGBA buffer too small: got {} bytes, need {}",
501                rgba.len(),
502                expected
503            )));
504        }
505
506        let start = std::time::Instant::now();
507        let tile_cols = self.tile_config.tile_cols();
508        let tile_rows = self.tile_config.tile_rows();
509
510        let tile_w = self.frame_width / tile_cols;
511        let tile_h = self.frame_height / tile_rows;
512
513        // Build list of (tile_col, tile_row) pairs for parallel iteration.
514        let coords: Vec<(u32, u32)> = (0..tile_rows)
515            .flat_map(|row| (0..tile_cols).map(move |col| (col, row)))
516            .collect();
517
518        // Extract and compress each tile in parallel.
519        let tile_bitstreams: Vec<(usize, Vec<u8>)> = {
520            use rayon::prelude::*;
521
522            let frame_width = self.frame_width;
523
524            coords
525                .par_iter()
526                .enumerate()
527                .map(|(idx, &(col, row))| {
528                    let x_start = col * tile_w;
529                    let y_start = row * tile_h;
530
531                    // Extract tile RGBA into a contiguous buffer.
532                    let mut tile_buf = Vec::with_capacity((tile_w * tile_h * 4) as usize);
533                    for ty in 0..tile_h {
534                        let src_row = y_start + ty;
535                        let src_start = ((src_row * frame_width + x_start) * 4) as usize;
536                        let src_end = src_start + (tile_w * 4) as usize;
537                        if src_end <= rgba.len() {
538                            tile_buf.extend_from_slice(&rgba[src_start..src_end]);
539                        }
540                    }
541
542                    // Compress tile: simple RLE on luma bytes as a proxy for
543                    // a real AV1 tile encode (full codec integration requires
544                    // the AV1 encoder stack; this provides the parallelism
545                    // scaffold and correct bitstream assembly).
546                    let compressed = compress_tile_placeholder(&tile_buf);
547
548                    (idx, compressed)
549                })
550                .collect()
551        };
552
553        let compressed_total: u64 = tile_bitstreams.iter().map(|(_, b)| b.len() as u64).sum();
554
555        self.stats.tiles_encoded += tile_bitstreams.len() as u32;
556        self.stats.compressed_bytes += compressed_total;
557        self.stats.wall_time_secs += start.elapsed().as_secs_f64();
558
559        Ok(assemble_av1_tile_bitstream(tile_bitstreams))
560    }
561
562    /// Resets the accumulated statistics.
563    pub fn reset_stats(&mut self) {
564        self.stats = Av1TileStats::default();
565    }
566}
567
568/// Assembles per-tile bitstreams into a single frame bitstream.
569///
570/// Format: `[tile_count: u32 LE] ([tile_idx: u32 LE] [byte_len: u32 LE]
571/// [data …]) …`
572#[must_use]
573pub fn assemble_av1_tile_bitstream(tiles: Vec<(usize, Vec<u8>)>) -> Vec<u8> {
574    let mut out = Vec::new();
575    // Write tile count header.
576    out.extend_from_slice(&(tiles.len() as u32).to_le_bytes());
577
578    // Write tiles in index order.
579    let mut sorted = tiles;
580    sorted.sort_by_key(|(idx, _)| *idx);
581
582    for (idx, data) in sorted {
583        out.extend_from_slice(&(idx as u32).to_le_bytes());
584        out.extend_from_slice(&(data.len() as u32).to_le_bytes());
585        out.extend_from_slice(&data);
586    }
587
588    out
589}
590
591/// Placeholder tile compressor: simple byte-pair run-length encoding of the
592/// luma bytes (R channel in RGBA), yielding a compact but easily decodable
593/// representation.  In production this would call the AV1 tile encoder from
594/// `oximedia-codec`.
595fn compress_tile_placeholder(rgba: &[u8]) -> Vec<u8> {
596    if rgba.is_empty() {
597        return Vec::new();
598    }
599    // Extract every 4th byte (R channel as luma proxy).
600    let luma: Vec<u8> = rgba.iter().step_by(4).copied().collect();
601
602    // RLE: pairs of (value, run_len) where run_len is 1-byte capped at 255.
603    let mut out = Vec::with_capacity(luma.len());
604    let mut i = 0;
605    while i < luma.len() {
606        let val = luma[i];
607        let mut run: u8 = 1;
608        while i + usize::from(run) < luma.len() && luma[i + usize::from(run)] == val && run < 255 {
609            run += 1;
610        }
611        out.push(val);
612        out.push(run);
613        i += usize::from(run);
614    }
615    out
616}
617
618/// Builder for creating parallel encode jobs.
619pub struct ParallelEncodeBuilder {
620    config: ParallelConfig,
621    jobs: Vec<TranscodeConfig>,
622}
623
624impl ParallelEncodeBuilder {
625    /// Creates a new parallel encode builder.
626    #[must_use]
627    pub fn new() -> Self {
628        Self {
629            config: ParallelConfig::default(),
630            jobs: Vec::new(),
631        }
632    }
633
634    /// Sets the maximum number of parallel jobs.
635    #[must_use]
636    pub fn max_parallel(mut self, max: usize) -> Self {
637        self.config.max_parallel = max;
638        self
639    }
640
641    /// Sets cores per encode job.
642    #[must_use]
643    pub fn cores_per_encode(mut self, cores: usize) -> Self {
644        self.config.cores_per_encode = Some(cores);
645        self
646    }
647
648    /// Sets the priority level.
649    #[must_use]
650    pub fn priority(mut self, priority: ParallelPriority) -> Self {
651        self.config.priority = priority;
652        self
653    }
654
655    /// Adds a job to the builder.
656    #[must_use]
657    pub fn add_job(mut self, job: TranscodeConfig) -> Self {
658        self.jobs.push(job);
659        self
660    }
661
662    /// Adds multiple jobs.
663    #[must_use]
664    pub fn add_jobs(mut self, jobs: Vec<TranscodeConfig>) -> Self {
665        self.jobs.extend(jobs);
666        self
667    }
668
669    /// Builds the parallel encoder.
670    #[must_use]
671    pub fn build(self) -> ParallelEncoder {
672        let mut encoder = ParallelEncoder::new(self.config);
673        encoder.add_jobs(self.jobs);
674        encoder
675    }
676}
677
678impl Default for ParallelEncodeBuilder {
679    fn default() -> Self {
680        Self::new()
681    }
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687
688    fn tmp_str(name: &str) -> String {
689        std::env::temp_dir()
690            .join(format!("oximedia-transcode-parallel-{name}"))
691            .to_string_lossy()
692            .into_owned()
693    }
694
695    #[test]
696    fn test_parallel_config_default() {
697        let config = ParallelConfig::default();
698        assert!(config.max_parallel > 0);
699        assert_eq!(config.priority, ParallelPriority::Normal);
700        assert!(config.use_thread_pool);
701    }
702
703    #[test]
704    fn test_parallel_config_validation() {
705        let valid = ParallelConfig::with_max_parallel(4);
706        assert!(valid.validate().is_ok());
707
708        let invalid = ParallelConfig {
709            max_parallel: 0,
710            ..Default::default()
711        };
712        assert!(invalid.validate().is_err());
713    }
714
715    #[test]
716    fn test_parallel_config_cores_validation() {
717        let valid = ParallelConfig::default().cores_per_encode(2);
718        assert!(valid.validate().is_ok());
719
720        let invalid = ParallelConfig::default().cores_per_encode(0);
721        assert!(invalid.validate().is_err());
722    }
723
724    #[test]
725    fn test_parallel_encoder_job_count() {
726        let mut encoder = ParallelEncoder::new(ParallelConfig::default());
727        assert_eq!(encoder.job_count(), 0);
728
729        let job = TranscodeConfig {
730            input: Some(tmp_str("input.mp4")),
731            output: Some(tmp_str("output.mp4")),
732            ..Default::default()
733        };
734
735        encoder.add_job(job);
736        assert_eq!(encoder.job_count(), 1);
737    }
738
739    #[test]
740    fn test_parallel_encoder_add_jobs() {
741        let mut encoder = ParallelEncoder::new(ParallelConfig::default());
742
743        let jobs = vec![
744            TranscodeConfig {
745                input: Some(tmp_str("input1.mp4")),
746                output: Some(tmp_str("output1.mp4")),
747                ..Default::default()
748            },
749            TranscodeConfig {
750                input: Some(tmp_str("input2.mp4")),
751                output: Some(tmp_str("output2.mp4")),
752                ..Default::default()
753            },
754        ];
755
756        encoder.add_jobs(jobs);
757        assert_eq!(encoder.job_count(), 2);
758    }
759
760    #[test]
761    fn test_parallel_encoder_clear() {
762        let mut encoder = ParallelEncoder::new(ParallelConfig::default());
763
764        let job = TranscodeConfig {
765            input: Some(tmp_str("input.mp4")),
766            output: Some(tmp_str("output.mp4")),
767            ..Default::default()
768        };
769
770        encoder.add_job(job);
771        assert_eq!(encoder.job_count(), 1);
772
773        encoder.clear();
774        assert_eq!(encoder.job_count(), 0);
775    }
776
777    #[test]
778    fn test_parallel_builder() {
779        let job = TranscodeConfig {
780            input: Some(tmp_str("input.mp4")),
781            output: Some(tmp_str("output.mp4")),
782            ..Default::default()
783        };
784
785        let encoder = ParallelEncodeBuilder::new()
786            .max_parallel(4)
787            .cores_per_encode(2)
788            .priority(ParallelPriority::High)
789            .add_job(job)
790            .build();
791
792        assert_eq!(encoder.config.max_parallel, 4);
793        assert_eq!(encoder.config.cores_per_encode, Some(2));
794        assert_eq!(encoder.config.priority, ParallelPriority::High);
795        assert_eq!(encoder.job_count(), 1);
796    }
797
798    #[test]
799    fn test_num_cpus() {
800        let cpus = num_cpus();
801        assert!(cpus > 0);
802        assert!(cpus <= 1024); // Reasonable upper bound
803    }
804
805    // ── Av1TileConfig tests ───────────────────────────────────────────────────
806
807    #[test]
808    fn test_av1_tile_config_default() {
809        let cfg = Av1TileConfig::default();
810        assert_eq!(cfg.tile_cols(), 2);
811        assert_eq!(cfg.tile_rows(), 2);
812        assert_eq!(cfg.total_tiles(), 4);
813        assert!(cfg.row_mt);
814    }
815
816    #[test]
817    fn test_av1_tile_config_new_valid() {
818        let cfg = Av1TileConfig::new(2, 1, 4).expect("valid config");
819        assert_eq!(cfg.tile_cols(), 4);
820        assert_eq!(cfg.tile_rows(), 2);
821        assert_eq!(cfg.total_tiles(), 8);
822    }
823
824    #[test]
825    fn test_av1_tile_config_new_invalid_cols() {
826        let result = Av1TileConfig::new(7, 1, 0);
827        assert!(result.is_err(), "log2 > 6 should fail");
828    }
829
830    #[test]
831    fn test_av1_tile_config_new_invalid_rows() {
832        let result = Av1TileConfig::new(1, 7, 0);
833        assert!(result.is_err(), "log2 > 6 should fail");
834    }
835
836    #[test]
837    fn test_av1_tile_config_auto_720p() {
838        let cfg = Av1TileConfig::auto(1280, 720, 4);
839        assert_eq!(cfg.tile_cols_log2, 1);
840        assert_eq!(cfg.tile_rows_log2, 0);
841    }
842
843    #[test]
844    fn test_av1_tile_config_auto_1080p() {
845        let cfg = Av1TileConfig::auto(1920, 1080, 4);
846        assert_eq!(cfg.tile_cols_log2, 1);
847        assert_eq!(cfg.tile_rows_log2, 1);
848    }
849
850    #[test]
851    fn test_av1_tile_config_auto_4k() {
852        let cfg = Av1TileConfig::auto(3840, 2160, 8);
853        assert_eq!(cfg.tile_cols_log2, 2);
854        assert_eq!(cfg.tile_rows_log2, 2);
855    }
856
857    #[test]
858    fn test_av1_tile_config_validate_ok() {
859        let cfg = Av1TileConfig::new(1, 1, 0).expect("valid");
860        // 1920×1080 with 2×2 tiles → 960×540 each — above 64×64 minimum
861        assert!(cfg.validate_for_frame(1920, 1080).is_ok());
862    }
863
864    #[test]
865    fn test_av1_tile_config_validate_too_small() {
866        let cfg = Av1TileConfig::new(3, 3, 0).expect("valid config");
867        // 256×256 with 8×8 tiles → 32×32 each — below 64×64 minimum
868        assert!(cfg.validate_for_frame(256, 256).is_err());
869    }
870
871    #[test]
872    fn test_av1_tile_parallel_encoder_encode_frame() {
873        // 512×512 RGBA frame; 2×2 tile grid (each tile 256×256)
874        let cfg = Av1TileConfig::new(1, 1, 2).expect("valid");
875        let mut encoder = Av1TileParallelEncoder::new(cfg, 512, 512).expect("encoder ok");
876
877        let frame_data = vec![128u8; 512 * 512 * 4]; // grey RGBA
878        let bitstream = encoder.encode_frame_rgba(&frame_data).expect("encode ok");
879
880        // Bitstream must contain the 4-byte tile count header + at least 4 tiles.
881        assert!(bitstream.len() >= 4, "bitstream should have header");
882        let tile_count =
883            u32::from_le_bytes([bitstream[0], bitstream[1], bitstream[2], bitstream[3]]);
884        assert_eq!(tile_count, 4, "should encode 4 tiles");
885
886        // Stats should be updated.
887        assert_eq!(encoder.stats().tiles_encoded, 4);
888        assert!(encoder.stats().compressed_bytes > 0);
889    }
890
891    #[test]
892    fn test_av1_tile_parallel_encoder_undersized_frame() {
893        let cfg = Av1TileConfig::default();
894        let mut encoder = Av1TileParallelEncoder::new(cfg, 256, 256).expect("encoder ok");
895
896        // Provide only 1 byte — way too small.
897        let result = encoder.encode_frame_rgba(&[0u8]);
898        assert!(result.is_err(), "undersized frame should fail");
899    }
900
901    #[test]
902    fn test_av1_tile_parallel_encoder_stats_reset() {
903        let cfg = Av1TileConfig::new(1, 1, 2).expect("valid");
904        let mut encoder = Av1TileParallelEncoder::new(cfg, 256, 256).expect("encoder ok");
905
906        let frame_data = vec![0u8; 256 * 256 * 4];
907        encoder.encode_frame_rgba(&frame_data).expect("encode ok");
908        assert!(encoder.stats().tiles_encoded > 0);
909
910        encoder.reset_stats();
911        assert_eq!(encoder.stats().tiles_encoded, 0);
912        assert_eq!(encoder.stats().compressed_bytes, 0);
913    }
914
915    #[test]
916    fn test_av1_tile_stats_tiles_per_second() {
917        let stats = Av1TileStats {
918            tiles_encoded: 100,
919            compressed_bytes: 50_000,
920            wall_time_secs: 2.0,
921        };
922        assert!((stats.tiles_per_second() - 50.0).abs() < 1e-9);
923        assert_eq!(stats.avg_bytes_per_tile(), 500);
924    }
925
926    #[test]
927    fn test_av1_tile_stats_zero_time() {
928        let stats = Av1TileStats::default();
929        assert!((stats.tiles_per_second()).abs() < 1e-9);
930        assert_eq!(stats.avg_bytes_per_tile(), 0);
931    }
932
933    #[test]
934    fn test_assemble_av1_tile_bitstream_order() {
935        // Tiles arrive out of order; assembled bitstream must sort them.
936        let tiles = vec![(1, vec![1u8, 2, 3]), (0, vec![4u8, 5, 6])];
937        let bs = assemble_av1_tile_bitstream(tiles);
938
939        // Count = 2
940        let count = u32::from_le_bytes([bs[0], bs[1], bs[2], bs[3]]);
941        assert_eq!(count, 2);
942
943        // First tile entry's index should be 0.
944        let idx0 = u32::from_le_bytes([bs[4], bs[5], bs[6], bs[7]]);
945        assert_eq!(idx0, 0);
946    }
947
948    #[test]
949    fn test_compress_tile_placeholder_empty() {
950        let result = compress_tile_placeholder(&[]);
951        assert!(result.is_empty());
952    }
953
954    #[test]
955    fn test_compress_tile_placeholder_rle() {
956        // 4 identical RGBA pixels (luma = 200) → single run of 4
957        let rgba = vec![
958            200u8, 0, 0, 255, 200, 0, 0, 255, 200, 0, 0, 255, 200, 0, 0, 255,
959        ];
960        let compressed = compress_tile_placeholder(&rgba);
961        // Should produce [200, 4] (one run of length 4)
962        assert_eq!(compressed, vec![200, 4]);
963    }
964}