Skip to main content

oximedia_transcode/
parallel.rs

1//! Parallel encoding for multiple outputs simultaneously.
2
3use crate::{Result, TranscodeConfig, TranscodeError, TranscodeOutput};
4use rayon::prelude::*;
5use std::sync::{Arc, Mutex};
6
7/// Configuration for parallel encoding.
8#[derive(Debug, Clone)]
9pub struct ParallelConfig {
10    /// Maximum number of parallel encodes.
11    pub max_parallel: usize,
12    /// CPU cores to use per encode.
13    pub cores_per_encode: Option<usize>,
14    /// Whether to use thread pools.
15    pub use_thread_pool: bool,
16    /// Priority for parallel jobs.
17    pub priority: ParallelPriority,
18}
19
20/// Priority levels for parallel jobs.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ParallelPriority {
23    /// Low priority (background processing).
24    Low,
25    /// Normal priority.
26    Normal,
27    /// High priority (time-sensitive).
28    High,
29}
30
31impl Default for ParallelConfig {
32    fn default() -> Self {
33        Self {
34            max_parallel: num_cpus(),
35            cores_per_encode: None,
36            use_thread_pool: true,
37            priority: ParallelPriority::Normal,
38        }
39    }
40}
41
42impl ParallelConfig {
43    /// Creates a new parallel config with automatic core detection.
44    #[must_use]
45    pub fn auto() -> Self {
46        Self::default()
47    }
48
49    /// Creates a config with a specific number of parallel jobs.
50    #[must_use]
51    pub fn with_max_parallel(max: usize) -> Self {
52        Self {
53            max_parallel: max,
54            ..Self::default()
55        }
56    }
57
58    /// Sets the number of cores per encode job.
59    #[must_use]
60    pub fn cores_per_encode(mut self, cores: usize) -> Self {
61        self.cores_per_encode = Some(cores);
62        self
63    }
64
65    /// Sets the priority level.
66    #[must_use]
67    pub fn priority(mut self, priority: ParallelPriority) -> Self {
68        self.priority = priority;
69        self
70    }
71
72    /// Validates the configuration.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the configuration is invalid.
77    pub fn validate(&self) -> Result<()> {
78        if self.max_parallel == 0 {
79            return Err(TranscodeError::ValidationError(
80                crate::ValidationError::Unsupported(
81                    "max_parallel must be greater than 0".to_string(),
82                ),
83            ));
84        }
85
86        if let Some(cores) = self.cores_per_encode {
87            if cores == 0 {
88                return Err(TranscodeError::ValidationError(
89                    crate::ValidationError::Unsupported(
90                        "cores_per_encode must be greater than 0".to_string(),
91                    ),
92                ));
93            }
94        }
95
96        Ok(())
97    }
98}
99
100/// Gets the number of CPU cores available.
101///
102/// Falls back to 4 if the system query fails.
103fn num_cpus() -> usize {
104    std::thread::available_parallelism()
105        .map(std::num::NonZero::get)
106        .unwrap_or(4) // unwrap_or is safe — this is a fallback, not unwrap()
107}
108
109/// Parallel encoder for processing multiple outputs simultaneously.
110pub struct ParallelEncoder {
111    config: ParallelConfig,
112    jobs: Vec<TranscodeConfig>,
113    results: Arc<Mutex<Vec<Result<TranscodeOutput>>>>,
114}
115
116impl ParallelEncoder {
117    /// Creates a new parallel encoder with the given configuration.
118    #[must_use]
119    pub fn new(config: ParallelConfig) -> Self {
120        Self {
121            config,
122            jobs: Vec::new(),
123            results: Arc::new(Mutex::new(Vec::new())),
124        }
125    }
126
127    /// Adds a job to the parallel encoder.
128    pub fn add_job(&mut self, job: TranscodeConfig) {
129        self.jobs.push(job);
130    }
131
132    /// Adds multiple jobs at once.
133    pub fn add_jobs(&mut self, jobs: Vec<TranscodeConfig>) {
134        self.jobs.extend(jobs);
135    }
136
137    /// Gets the number of jobs queued.
138    #[must_use]
139    pub fn job_count(&self) -> usize {
140        self.jobs.len()
141    }
142
143    /// Executes all jobs in parallel.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the configuration is invalid. Individual job errors
148    /// are captured in the results.
149    pub async fn execute_all(&mut self) -> Result<Vec<Result<TranscodeOutput>>> {
150        self.config.validate()?;
151
152        // Configure thread pool
153        let pool = rayon::ThreadPoolBuilder::new()
154            .num_threads(self.config.max_parallel)
155            .build()
156            .map_err(|e| {
157                TranscodeError::PipelineError(format!("Failed to create thread pool: {e}"))
158            })?;
159
160        let jobs = std::mem::take(&mut self.jobs);
161
162        // Execute jobs in parallel and collect results directly.
163        let job_results: Vec<Result<TranscodeOutput>> = pool.install(|| {
164            jobs.into_par_iter()
165                .map(Self::execute_job)
166                .collect::<Vec<_>>()
167        });
168
169        // Store results for later retrieval.
170        match self.results.lock() {
171            Ok(mut guard) => {
172                guard.extend(job_results.iter().cloned());
173            }
174            Err(poisoned) => {
175                poisoned.into_inner().extend(job_results.iter().cloned());
176            }
177        }
178
179        Ok(job_results)
180    }
181
182    /// Executes all jobs sequentially (for debugging).
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if any job fails.
187    pub async fn execute_sequential(&mut self) -> Result<Vec<TranscodeOutput>> {
188        let mut outputs = Vec::new();
189
190        for job in &self.jobs {
191            let output = Self::execute_job(job.clone())?;
192            outputs.push(output);
193        }
194
195        Ok(outputs)
196    }
197
198    /// Executes a single transcode job synchronously.
199    ///
200    /// Validates the job configuration and then delegates to the
201    /// pipeline builder for actual transcoding. The pipeline is
202    /// executed on a per-thread tokio runtime so that async I/O
203    /// works within the rayon thread pool.
204    #[cfg(not(target_arch = "wasm32"))]
205    fn execute_job(job: TranscodeConfig) -> Result<TranscodeOutput> {
206        let input = job
207            .input
208            .as_deref()
209            .ok_or_else(|| TranscodeError::InvalidInput("No input file specified".to_string()))?;
210
211        let output = job
212            .output
213            .as_deref()
214            .ok_or_else(|| TranscodeError::InvalidOutput("No output file specified".to_string()))?;
215
216        // Build a pipeline from the job config.
217        let mut pipeline_builder = crate::pipeline::TranscodePipelineBuilder::new()
218            .input(input)
219            .output(output);
220
221        if let Some(ref vc) = job.video_codec {
222            pipeline_builder = pipeline_builder.video_codec(vc);
223        }
224        if let Some(ref ac) = job.audio_codec {
225            pipeline_builder = pipeline_builder.audio_codec(ac);
226        }
227        if let Some(mode) = job.multi_pass {
228            pipeline_builder = pipeline_builder.multipass(mode);
229        }
230
231        let mut pipeline = pipeline_builder.build()?;
232
233        // Create a per-thread tokio runtime to drive the async pipeline.
234        let rt = tokio::runtime::Builder::new_current_thread()
235            .enable_all()
236            .build()
237            .map_err(|e| {
238                TranscodeError::PipelineError(format!("Failed to create async runtime: {e}"))
239            })?;
240
241        rt.block_on(pipeline.execute())
242    }
243
244    /// Executes a single transcode job synchronously (wasm32 stub).
245    #[cfg(target_arch = "wasm32")]
246    fn execute_job(_job: TranscodeConfig) -> Result<TranscodeOutput> {
247        Err(TranscodeError::Unsupported(
248            "Parallel job execution is not supported on wasm32".to_string(),
249        ))
250    }
251
252    /// Gets the results of completed jobs.
253    #[must_use]
254    pub fn get_results(&self) -> Vec<Result<TranscodeOutput>> {
255        match self.results.lock() {
256            Ok(guard) => guard.clone(),
257            Err(poisoned) => poisoned.into_inner().clone(),
258        }
259    }
260
261    /// Clears all jobs and results.
262    pub fn clear(&mut self) {
263        self.jobs.clear();
264        match self.results.lock() {
265            Ok(mut guard) => guard.clear(),
266            Err(poisoned) => poisoned.into_inner().clear(),
267        }
268    }
269}
270
271// ─── AV1 tile-based parallel encoding ────────────────────────────────────────
272
273/// Configuration for AV1 tile-based parallel encoding.
274///
275/// AV1 supports dividing each frame into an `M × N` grid of independently
276/// encodable tiles.  Encoding multiple tiles in parallel with rayon reduces
277/// wall-clock time on multi-core systems while producing a bitstream that is
278/// backward-compatible with single-threaded decoders.
279///
280/// `tile_cols` and `tile_rows` are specified as **log2** values following the
281/// AV1 specification (e.g. 2 → 4 tile columns).  Use [`Av1TileConfig::auto`]
282/// to pick sensible defaults for a given resolution.
283#[derive(Debug, Clone, PartialEq, Eq)]
284pub struct Av1TileConfig {
285    /// Log2 number of tile columns (0–6).
286    pub tile_cols_log2: u8,
287    /// Log2 number of tile rows (0–6).
288    pub tile_rows_log2: u8,
289    /// Number of rayon worker threads (0 = auto).
290    pub threads: usize,
291    /// Enable row-based multi-threading within each tile.
292    pub row_mt: bool,
293}
294
295impl Default for Av1TileConfig {
296    fn default() -> Self {
297        Self {
298            tile_cols_log2: 1, // 2 columns
299            tile_rows_log2: 1, // 2 rows
300            threads: 0,
301            row_mt: true,
302        }
303    }
304}
305
306impl Av1TileConfig {
307    /// Creates a new AV1 tile configuration with explicit log2 tile counts.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error when `tile_cols_log2` or `tile_rows_log2` exceeds 6.
312    pub fn new(tile_cols_log2: u8, tile_rows_log2: u8, threads: usize) -> Result<Self> {
313        if tile_cols_log2 > 6 {
314            return Err(TranscodeError::ValidationError(
315                crate::ValidationError::Unsupported(format!(
316                    "tile_cols_log2 must be 0–6, got {tile_cols_log2}"
317                )),
318            ));
319        }
320        if tile_rows_log2 > 6 {
321            return Err(TranscodeError::ValidationError(
322                crate::ValidationError::Unsupported(format!(
323                    "tile_rows_log2 must be 0–6, got {tile_rows_log2}"
324                )),
325            ));
326        }
327        Ok(Self {
328            tile_cols_log2,
329            tile_rows_log2,
330            threads,
331            row_mt: true,
332        })
333    }
334
335    /// Automatically selects tile counts appropriate for the given resolution.
336    ///
337    /// | Resolution    | tile_cols_log2 | tile_rows_log2 |
338    /// |---------------|----------------|----------------|
339    /// | ≤ 720p        | 1 (2 cols)     | 0 (1 row)      |
340    /// | 1080p         | 1 (2 cols)     | 1 (2 rows)     |
341    /// | 4K (2160p)    | 2 (4 cols)     | 2 (4 rows)     |
342    /// | 8K (4320p)    | 3 (8 cols)     | 2 (4 rows)     |
343    #[must_use]
344    pub fn auto(_width: u32, height: u32, threads: usize) -> Self {
345        let (cols_log2, rows_log2) = if height <= 720 {
346            (1, 0)
347        } else if height <= 1080 {
348            (1, 1)
349        } else if height <= 2160 {
350            (2, 2)
351        } else {
352            (3, 2)
353        };
354        Self {
355            tile_cols_log2: cols_log2,
356            tile_rows_log2: rows_log2,
357            threads,
358            row_mt: true,
359        }
360    }
361
362    /// Returns the actual number of tile columns (2^tile_cols_log2).
363    #[must_use]
364    pub fn tile_cols(&self) -> u32 {
365        1u32 << self.tile_cols_log2
366    }
367
368    /// Returns the actual number of tile rows (2^tile_rows_log2).
369    #[must_use]
370    pub fn tile_rows(&self) -> u32 {
371        1u32 << self.tile_rows_log2
372    }
373
374    /// Returns the total number of tiles per frame.
375    #[must_use]
376    pub fn total_tiles(&self) -> u32 {
377        self.tile_cols() * self.tile_rows()
378    }
379
380    /// Validates the configuration against frame dimensions.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the tile grid would produce tiles smaller than
385    /// 64×64 pixels (the AV1 minimum superblock size).
386    pub fn validate_for_frame(&self, width: u32, height: u32) -> Result<()> {
387        const MIN_TILE_DIM: u32 = 64;
388        let tile_w = width / self.tile_cols();
389        let tile_h = height / self.tile_rows();
390        if tile_w < MIN_TILE_DIM || tile_h < MIN_TILE_DIM {
391            return Err(TranscodeError::ValidationError(
392                crate::ValidationError::Unsupported(format!(
393                    "Tile grid {}×{} produces tiles {}×{} which is smaller than \
394                     the AV1 minimum {}×{} pixels",
395                    self.tile_cols(),
396                    self.tile_rows(),
397                    tile_w,
398                    tile_h,
399                    MIN_TILE_DIM,
400                    MIN_TILE_DIM
401                )),
402            ));
403        }
404        Ok(())
405    }
406}
407
408/// Statistics produced by a single AV1 tile encode pass.
409#[derive(Debug, Clone, Default)]
410pub struct Av1TileStats {
411    /// Total number of tiles encoded.
412    pub tiles_encoded: u32,
413    /// Total compressed bytes produced across all tiles.
414    pub compressed_bytes: u64,
415    /// Wall-clock time in seconds.
416    pub wall_time_secs: f64,
417}
418
419impl Av1TileStats {
420    /// Estimated throughput in tiles per second.
421    #[must_use]
422    pub fn tiles_per_second(&self) -> f64 {
423        if self.wall_time_secs > 0.0 {
424            f64::from(self.tiles_encoded) / self.wall_time_secs
425        } else {
426            0.0
427        }
428    }
429
430    /// Average compressed bytes per tile.
431    #[must_use]
432    pub fn avg_bytes_per_tile(&self) -> u64 {
433        if self.tiles_encoded == 0 {
434            return 0;
435        }
436        self.compressed_bytes / u64::from(self.tiles_encoded)
437    }
438}
439
440/// Encoder that splits video frames into tiles and encodes each tile in
441/// parallel using rayon.
442///
443/// The encoder works with raw RGBA or YUV420 frame data (stored as a flat
444/// `Vec<u8>`).  Each tile is extracted from the source buffer, passed to
445/// the configurable `Av1TileEncodeOp` trait, and the resulting bitstreams
446/// are assembled into a single frame bitstream.
447pub struct Av1TileParallelEncoder {
448    tile_config: Av1TileConfig,
449    frame_width: u32,
450    frame_height: u32,
451    stats: Av1TileStats,
452}
453
454impl Av1TileParallelEncoder {
455    /// Creates a new tile encoder for frames of `frame_width × frame_height`.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error when the tile configuration is incompatible with the
460    /// frame dimensions.
461    pub fn new(tile_config: Av1TileConfig, frame_width: u32, frame_height: u32) -> Result<Self> {
462        tile_config.validate_for_frame(frame_width, frame_height)?;
463        Ok(Self {
464            tile_config,
465            frame_width,
466            frame_height,
467            stats: Av1TileStats::default(),
468        })
469    }
470
471    /// Returns the tile configuration.
472    #[must_use]
473    pub fn tile_config(&self) -> &Av1TileConfig {
474        &self.tile_config
475    }
476
477    /// Returns collected encoding statistics.
478    #[must_use]
479    pub fn stats(&self) -> &Av1TileStats {
480        &self.stats
481    }
482
483    /// Encodes one RGBA frame using tile-based rayon parallelism.
484    ///
485    /// The frame data must be a row-major RGBA buffer of exactly
486    /// `frame_width * frame_height * 4` bytes.
487    ///
488    /// # Returns
489    ///
490    /// An assembled tile bitstream (simple concatenation with 4-byte LE length
491    /// prefix per tile, as produced by [`assemble_av1_tile_bitstream`]).
492    ///
493    /// # Errors
494    ///
495    /// Returns an error when the frame buffer is too small.
496    pub fn encode_frame_rgba(&mut self, rgba: &[u8]) -> Result<Vec<u8>> {
497        let expected = (self.frame_width * self.frame_height * 4) as usize;
498        if rgba.len() < expected {
499            return Err(TranscodeError::CodecError(format!(
500                "RGBA buffer too small: got {} bytes, need {}",
501                rgba.len(),
502                expected
503            )));
504        }
505
506        let start = std::time::Instant::now();
507        let tile_cols = self.tile_config.tile_cols();
508        let tile_rows = self.tile_config.tile_rows();
509
510        let tile_w = self.frame_width / tile_cols;
511        let tile_h = self.frame_height / tile_rows;
512
513        // Build list of (tile_col, tile_row) pairs for parallel iteration.
514        let coords: Vec<(u32, u32)> = (0..tile_rows)
515            .flat_map(|row| (0..tile_cols).map(move |col| (col, row)))
516            .collect();
517
518        // Extract and compress each tile in parallel.
519        let tile_bitstreams: Vec<(usize, Vec<u8>)> = {
520            use rayon::prelude::*;
521
522            let frame_width = self.frame_width;
523
524            coords
525                .par_iter()
526                .enumerate()
527                .map(|(idx, &(col, row))| {
528                    let x_start = col * tile_w;
529                    let y_start = row * tile_h;
530
531                    // Extract tile RGBA into a contiguous buffer.
532                    let mut tile_buf = Vec::with_capacity((tile_w * tile_h * 4) as usize);
533                    for ty in 0..tile_h {
534                        let src_row = y_start + ty;
535                        let src_start = ((src_row * frame_width + x_start) * 4) as usize;
536                        let src_end = src_start + (tile_w * 4) as usize;
537                        if src_end <= rgba.len() {
538                            tile_buf.extend_from_slice(&rgba[src_start..src_end]);
539                        }
540                    }
541
542                    // Compress tile: simple RLE on luma bytes as a proxy for
543                    // a real AV1 tile encode (full codec integration requires
544                    // the AV1 encoder stack; this provides the parallelism
545                    // scaffold and correct bitstream assembly).
546                    let compressed = compress_tile_placeholder(&tile_buf);
547
548                    (idx, compressed)
549                })
550                .collect()
551        };
552
553        let compressed_total: u64 = tile_bitstreams.iter().map(|(_, b)| b.len() as u64).sum();
554
555        self.stats.tiles_encoded += tile_bitstreams.len() as u32;
556        self.stats.compressed_bytes += compressed_total;
557        self.stats.wall_time_secs += start.elapsed().as_secs_f64();
558
559        Ok(assemble_av1_tile_bitstream(tile_bitstreams))
560    }
561
562    /// Resets the accumulated statistics.
563    pub fn reset_stats(&mut self) {
564        self.stats = Av1TileStats::default();
565    }
566}
567
568/// Assembles per-tile bitstreams into a single frame bitstream.
569///
570/// Format: `[tile_count: u32 LE] ([tile_idx: u32 LE] [byte_len: u32 LE]
571/// [data …]) …`
572#[must_use]
573pub fn assemble_av1_tile_bitstream(tiles: Vec<(usize, Vec<u8>)>) -> Vec<u8> {
574    let mut out = Vec::new();
575    // Write tile count header.
576    out.extend_from_slice(&(tiles.len() as u32).to_le_bytes());
577
578    // Write tiles in index order.
579    let mut sorted = tiles;
580    sorted.sort_by_key(|(idx, _)| *idx);
581
582    for (idx, data) in sorted {
583        out.extend_from_slice(&(idx as u32).to_le_bytes());
584        out.extend_from_slice(&(data.len() as u32).to_le_bytes());
585        out.extend_from_slice(&data);
586    }
587
588    out
589}
590
591/// Placeholder tile compressor: simple byte-pair run-length encoding of the
592/// luma bytes (R channel in RGBA), yielding a compact but easily decodable
593/// representation.  In production this would call the AV1 tile encoder from
594/// `oximedia-codec`.
595fn compress_tile_placeholder(rgba: &[u8]) -> Vec<u8> {
596    if rgba.is_empty() {
597        return Vec::new();
598    }
599    // Extract every 4th byte (R channel as luma proxy).
600    let luma: Vec<u8> = rgba.iter().step_by(4).copied().collect();
601
602    // RLE: pairs of (value, run_len) where run_len is 1-byte capped at 255.
603    let mut out = Vec::with_capacity(luma.len());
604    let mut i = 0;
605    while i < luma.len() {
606        let val = luma[i];
607        let mut run: u8 = 1;
608        while i + usize::from(run) < luma.len() && luma[i + usize::from(run)] == val && run < 255 {
609            run += 1;
610        }
611        out.push(val);
612        out.push(run);
613        i += usize::from(run);
614    }
615    out
616}
617
618/// Builder for creating parallel encode jobs.
619pub struct ParallelEncodeBuilder {
620    config: ParallelConfig,
621    jobs: Vec<TranscodeConfig>,
622}
623
624impl ParallelEncodeBuilder {
625    /// Creates a new parallel encode builder.
626    #[must_use]
627    pub fn new() -> Self {
628        Self {
629            config: ParallelConfig::default(),
630            jobs: Vec::new(),
631        }
632    }
633
634    /// Sets the maximum number of parallel jobs.
635    #[must_use]
636    pub fn max_parallel(mut self, max: usize) -> Self {
637        self.config.max_parallel = max;
638        self
639    }
640
641    /// Sets cores per encode job.
642    #[must_use]
643    pub fn cores_per_encode(mut self, cores: usize) -> Self {
644        self.config.cores_per_encode = Some(cores);
645        self
646    }
647
648    /// Sets the priority level.
649    #[must_use]
650    pub fn priority(mut self, priority: ParallelPriority) -> Self {
651        self.config.priority = priority;
652        self
653    }
654
655    /// Adds a job to the builder.
656    #[must_use]
657    pub fn add_job(mut self, job: TranscodeConfig) -> Self {
658        self.jobs.push(job);
659        self
660    }
661
662    /// Adds multiple jobs.
663    #[must_use]
664    pub fn add_jobs(mut self, jobs: Vec<TranscodeConfig>) -> Self {
665        self.jobs.extend(jobs);
666        self
667    }
668
669    /// Builds the parallel encoder.
670    #[must_use]
671    pub fn build(self) -> ParallelEncoder {
672        let mut encoder = ParallelEncoder::new(self.config);
673        encoder.add_jobs(self.jobs);
674        encoder
675    }
676}
677
678impl Default for ParallelEncodeBuilder {
679    fn default() -> Self {
680        Self::new()
681    }
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687
688    #[test]
689    fn test_parallel_config_default() {
690        let config = ParallelConfig::default();
691        assert!(config.max_parallel > 0);
692        assert_eq!(config.priority, ParallelPriority::Normal);
693        assert!(config.use_thread_pool);
694    }
695
696    #[test]
697    fn test_parallel_config_validation() {
698        let valid = ParallelConfig::with_max_parallel(4);
699        assert!(valid.validate().is_ok());
700
701        let invalid = ParallelConfig {
702            max_parallel: 0,
703            ..Default::default()
704        };
705        assert!(invalid.validate().is_err());
706    }
707
708    #[test]
709    fn test_parallel_config_cores_validation() {
710        let valid = ParallelConfig::default().cores_per_encode(2);
711        assert!(valid.validate().is_ok());
712
713        let invalid = ParallelConfig::default().cores_per_encode(0);
714        assert!(invalid.validate().is_err());
715    }
716
717    #[test]
718    fn test_parallel_encoder_job_count() {
719        let mut encoder = ParallelEncoder::new(ParallelConfig::default());
720        assert_eq!(encoder.job_count(), 0);
721
722        let job = TranscodeConfig {
723            input: Some("/tmp/input.mp4".to_string()),
724            output: Some("/tmp/output.mp4".to_string()),
725            ..Default::default()
726        };
727
728        encoder.add_job(job);
729        assert_eq!(encoder.job_count(), 1);
730    }
731
732    #[test]
733    fn test_parallel_encoder_add_jobs() {
734        let mut encoder = ParallelEncoder::new(ParallelConfig::default());
735
736        let jobs = vec![
737            TranscodeConfig {
738                input: Some("/tmp/input1.mp4".to_string()),
739                output: Some("/tmp/output1.mp4".to_string()),
740                ..Default::default()
741            },
742            TranscodeConfig {
743                input: Some("/tmp/input2.mp4".to_string()),
744                output: Some("/tmp/output2.mp4".to_string()),
745                ..Default::default()
746            },
747        ];
748
749        encoder.add_jobs(jobs);
750        assert_eq!(encoder.job_count(), 2);
751    }
752
753    #[test]
754    fn test_parallel_encoder_clear() {
755        let mut encoder = ParallelEncoder::new(ParallelConfig::default());
756
757        let job = TranscodeConfig {
758            input: Some("/tmp/input.mp4".to_string()),
759            output: Some("/tmp/output.mp4".to_string()),
760            ..Default::default()
761        };
762
763        encoder.add_job(job);
764        assert_eq!(encoder.job_count(), 1);
765
766        encoder.clear();
767        assert_eq!(encoder.job_count(), 0);
768    }
769
770    #[test]
771    fn test_parallel_builder() {
772        let job = TranscodeConfig {
773            input: Some("/tmp/input.mp4".to_string()),
774            output: Some("/tmp/output.mp4".to_string()),
775            ..Default::default()
776        };
777
778        let encoder = ParallelEncodeBuilder::new()
779            .max_parallel(4)
780            .cores_per_encode(2)
781            .priority(ParallelPriority::High)
782            .add_job(job)
783            .build();
784
785        assert_eq!(encoder.config.max_parallel, 4);
786        assert_eq!(encoder.config.cores_per_encode, Some(2));
787        assert_eq!(encoder.config.priority, ParallelPriority::High);
788        assert_eq!(encoder.job_count(), 1);
789    }
790
791    #[test]
792    fn test_num_cpus() {
793        let cpus = num_cpus();
794        assert!(cpus > 0);
795        assert!(cpus <= 1024); // Reasonable upper bound
796    }
797
798    // ── Av1TileConfig tests ───────────────────────────────────────────────────
799
800    #[test]
801    fn test_av1_tile_config_default() {
802        let cfg = Av1TileConfig::default();
803        assert_eq!(cfg.tile_cols(), 2);
804        assert_eq!(cfg.tile_rows(), 2);
805        assert_eq!(cfg.total_tiles(), 4);
806        assert!(cfg.row_mt);
807    }
808
809    #[test]
810    fn test_av1_tile_config_new_valid() {
811        let cfg = Av1TileConfig::new(2, 1, 4).expect("valid config");
812        assert_eq!(cfg.tile_cols(), 4);
813        assert_eq!(cfg.tile_rows(), 2);
814        assert_eq!(cfg.total_tiles(), 8);
815    }
816
817    #[test]
818    fn test_av1_tile_config_new_invalid_cols() {
819        let result = Av1TileConfig::new(7, 1, 0);
820        assert!(result.is_err(), "log2 > 6 should fail");
821    }
822
823    #[test]
824    fn test_av1_tile_config_new_invalid_rows() {
825        let result = Av1TileConfig::new(1, 7, 0);
826        assert!(result.is_err(), "log2 > 6 should fail");
827    }
828
829    #[test]
830    fn test_av1_tile_config_auto_720p() {
831        let cfg = Av1TileConfig::auto(1280, 720, 4);
832        assert_eq!(cfg.tile_cols_log2, 1);
833        assert_eq!(cfg.tile_rows_log2, 0);
834    }
835
836    #[test]
837    fn test_av1_tile_config_auto_1080p() {
838        let cfg = Av1TileConfig::auto(1920, 1080, 4);
839        assert_eq!(cfg.tile_cols_log2, 1);
840        assert_eq!(cfg.tile_rows_log2, 1);
841    }
842
843    #[test]
844    fn test_av1_tile_config_auto_4k() {
845        let cfg = Av1TileConfig::auto(3840, 2160, 8);
846        assert_eq!(cfg.tile_cols_log2, 2);
847        assert_eq!(cfg.tile_rows_log2, 2);
848    }
849
850    #[test]
851    fn test_av1_tile_config_validate_ok() {
852        let cfg = Av1TileConfig::new(1, 1, 0).expect("valid");
853        // 1920×1080 with 2×2 tiles → 960×540 each — above 64×64 minimum
854        assert!(cfg.validate_for_frame(1920, 1080).is_ok());
855    }
856
857    #[test]
858    fn test_av1_tile_config_validate_too_small() {
859        let cfg = Av1TileConfig::new(3, 3, 0).expect("valid config");
860        // 256×256 with 8×8 tiles → 32×32 each — below 64×64 minimum
861        assert!(cfg.validate_for_frame(256, 256).is_err());
862    }
863
864    #[test]
865    fn test_av1_tile_parallel_encoder_encode_frame() {
866        // 512×512 RGBA frame; 2×2 tile grid (each tile 256×256)
867        let cfg = Av1TileConfig::new(1, 1, 2).expect("valid");
868        let mut encoder = Av1TileParallelEncoder::new(cfg, 512, 512).expect("encoder ok");
869
870        let frame_data = vec![128u8; 512 * 512 * 4]; // grey RGBA
871        let bitstream = encoder.encode_frame_rgba(&frame_data).expect("encode ok");
872
873        // Bitstream must contain the 4-byte tile count header + at least 4 tiles.
874        assert!(bitstream.len() >= 4, "bitstream should have header");
875        let tile_count =
876            u32::from_le_bytes([bitstream[0], bitstream[1], bitstream[2], bitstream[3]]);
877        assert_eq!(tile_count, 4, "should encode 4 tiles");
878
879        // Stats should be updated.
880        assert_eq!(encoder.stats().tiles_encoded, 4);
881        assert!(encoder.stats().compressed_bytes > 0);
882    }
883
884    #[test]
885    fn test_av1_tile_parallel_encoder_undersized_frame() {
886        let cfg = Av1TileConfig::default();
887        let mut encoder = Av1TileParallelEncoder::new(cfg, 256, 256).expect("encoder ok");
888
889        // Provide only 1 byte — way too small.
890        let result = encoder.encode_frame_rgba(&[0u8]);
891        assert!(result.is_err(), "undersized frame should fail");
892    }
893
894    #[test]
895    fn test_av1_tile_parallel_encoder_stats_reset() {
896        let cfg = Av1TileConfig::new(1, 1, 2).expect("valid");
897        let mut encoder = Av1TileParallelEncoder::new(cfg, 256, 256).expect("encoder ok");
898
899        let frame_data = vec![0u8; 256 * 256 * 4];
900        encoder.encode_frame_rgba(&frame_data).expect("encode ok");
901        assert!(encoder.stats().tiles_encoded > 0);
902
903        encoder.reset_stats();
904        assert_eq!(encoder.stats().tiles_encoded, 0);
905        assert_eq!(encoder.stats().compressed_bytes, 0);
906    }
907
908    #[test]
909    fn test_av1_tile_stats_tiles_per_second() {
910        let stats = Av1TileStats {
911            tiles_encoded: 100,
912            compressed_bytes: 50_000,
913            wall_time_secs: 2.0,
914        };
915        assert!((stats.tiles_per_second() - 50.0).abs() < 1e-9);
916        assert_eq!(stats.avg_bytes_per_tile(), 500);
917    }
918
919    #[test]
920    fn test_av1_tile_stats_zero_time() {
921        let stats = Av1TileStats::default();
922        assert!((stats.tiles_per_second()).abs() < 1e-9);
923        assert_eq!(stats.avg_bytes_per_tile(), 0);
924    }
925
926    #[test]
927    fn test_assemble_av1_tile_bitstream_order() {
928        // Tiles arrive out of order; assembled bitstream must sort them.
929        let tiles = vec![(1, vec![1u8, 2, 3]), (0, vec![4u8, 5, 6])];
930        let bs = assemble_av1_tile_bitstream(tiles);
931
932        // Count = 2
933        let count = u32::from_le_bytes([bs[0], bs[1], bs[2], bs[3]]);
934        assert_eq!(count, 2);
935
936        // First tile entry's index should be 0.
937        let idx0 = u32::from_le_bytes([bs[4], bs[5], bs[6], bs[7]]);
938        assert_eq!(idx0, 0);
939    }
940
941    #[test]
942    fn test_compress_tile_placeholder_empty() {
943        let result = compress_tile_placeholder(&[]);
944        assert!(result.is_empty());
945    }
946
947    #[test]
948    fn test_compress_tile_placeholder_rle() {
949        // 4 identical RGBA pixels (luma = 200) → single run of 4
950        let rgba = vec![
951            200u8, 0, 0, 255, 200, 0, 0, 255, 200, 0, 0, 255, 200, 0, 0, 255,
952        ];
953        let compressed = compress_tile_placeholder(&rgba);
954        // Should produce [200, 4] (one run of length 4)
955        assert_eq!(compressed, vec![200, 4]);
956    }
957}