audiobook_forge/core/
batch.rs

1//! Batch processor for parallel audiobook processing
2
3use crate::audio::AacEncoder;
4use crate::core::{Processor, RetryConfig, smart_retry_async};
5use crate::models::{BookFolder, ProcessingResult};
6use anyhow::Result;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::sync::{mpsc, Semaphore};
10
11/// Batch processor for converting multiple audiobooks in parallel
12pub struct BatchProcessor {
13    /// Number of parallel workers
14    workers: usize,
15    /// Keep temporary files for debugging
16    keep_temp: bool,
17    /// AAC encoder to use
18    encoder: AacEncoder,
19    /// Enable parallel file encoding
20    enable_parallel_encoding: bool,
21    /// Maximum concurrent encoding operations (to limit CPU usage)
22    max_concurrent_encodes: usize,
23    /// Maximum concurrent file encodings per book
24    max_concurrent_files: usize,
25    /// Quality preset override
26    quality_preset: Option<String>,
27    /// Retry configuration
28    retry_config: RetryConfig,
29}
30
31impl BatchProcessor {
32    /// Create a new batch processor with default settings
33    pub fn new(workers: usize) -> Self {
34        Self {
35            workers: workers.clamp(1, 16),
36            keep_temp: false,
37            encoder: crate::audio::get_encoder(),
38            enable_parallel_encoding: true,
39            max_concurrent_encodes: 2, // Default: 2 concurrent encodes
40            max_concurrent_files: 8, // Default: 8 concurrent files per book
41            quality_preset: None,
42            retry_config: RetryConfig::new(),
43        }
44    }
45
46    /// Create batch processor with custom options
47    pub fn with_options(
48        workers: usize,
49        keep_temp: bool,
50        encoder: AacEncoder,
51        enable_parallel_encoding: bool,
52        max_concurrent_encodes: usize,
53        max_concurrent_files: usize,
54        quality_preset: Option<String>,
55        retry_config: RetryConfig,
56    ) -> Self {
57        Self {
58            workers: workers.clamp(1, 16),
59            keep_temp,
60            encoder,
61            enable_parallel_encoding,
62            max_concurrent_encodes: max_concurrent_encodes.clamp(1, 16),
63            max_concurrent_files: max_concurrent_files.clamp(1, 32),
64            quality_preset,
65            retry_config,
66        }
67    }
68
69    /// Process multiple books in parallel
70    pub async fn process_batch(
71        &self,
72        books: Vec<BookFolder>,
73        output_dir: &Path,
74        chapter_source: &str,
75    ) -> Vec<ProcessingResult> {
76        let total_books = books.len();
77
78        if total_books == 0 {
79            return Vec::new();
80        }
81
82        tracing::info!(
83            "Starting batch processing: {} books with {} workers (max {} concurrent encodes)",
84            total_books,
85            self.workers,
86            self.max_concurrent_encodes
87        );
88
89        // Create a semaphore to limit concurrent encoding operations
90        let encode_semaphore = Arc::new(Semaphore::new(self.max_concurrent_encodes));
91
92        // Create channel for collecting results
93        let (result_tx, mut result_rx) = mpsc::channel(total_books);
94
95        // Spawn tasks for each book
96        let mut handles = Vec::new();
97
98        for (index, book) in books.into_iter().enumerate() {
99            let result_tx = result_tx.clone();
100            let output_dir = output_dir.to_path_buf();
101            let chapter_source = chapter_source.to_string();
102            let keep_temp = self.keep_temp;
103            let encoder = self.encoder;
104            let enable_parallel_encoding = self.enable_parallel_encoding;
105            let max_concurrent_files = self.max_concurrent_files;
106            let quality_preset = self.quality_preset.clone();
107            let encode_semaphore = Arc::clone(&encode_semaphore);
108            let retry_config = self.retry_config.clone();
109
110            let handle = tokio::spawn(async move {
111                // Acquire semaphore permit before encoding (limits concurrent encodes)
112                let _permit = encode_semaphore.acquire().await.unwrap();
113
114                tracing::info!(
115                    "[{}/{}] Processing: {}",
116                    index + 1,
117                    total_books,
118                    book.name
119                );
120
121                // Process with retry logic
122                let result = smart_retry_async(&retry_config, || {
123                    Self::process_single_book(
124                        &book,
125                        &output_dir,
126                        &chapter_source,
127                        keep_temp,
128                        encoder,
129                        enable_parallel_encoding,
130                        max_concurrent_files,
131                        quality_preset.clone(),
132                    )
133                })
134                .await
135                .unwrap_or_else(|e| {
136                    // If all retries fail, return a failure result
137                    tracing::error!("✗ {}: {:?}", book.name, e);
138                    ProcessingResult::new(book.name.clone())
139                        .failure(format!("All retries failed: {:?}", e), 0.0)
140                });
141
142                // Send result through channel
143                let _ = result_tx.send(result).await;
144            });
145
146            handles.push(handle);
147        }
148
149        // Drop the original sender so the receiver knows when all tasks are done
150        drop(result_tx);
151
152        // Collect all results
153        let mut results = Vec::new();
154        while let Some(result) = result_rx.recv().await {
155            results.push(result);
156        }
157
158        // Wait for all tasks to complete
159        for handle in handles {
160            let _ = handle.await;
161        }
162
163        tracing::info!(
164            "Batch processing complete: {}/{} successful",
165            results.iter().filter(|r| r.success).count(),
166            results.len()
167        );
168
169        results
170    }
171
172    /// Process a single book (internal helper)
173    async fn process_single_book(
174        book: &BookFolder,
175        output_dir: &Path,
176        chapter_source: &str,
177        keep_temp: bool,
178        encoder: AacEncoder,
179        enable_parallel_encoding: bool,
180        max_concurrent_files: usize,
181        quality_preset: Option<String>,
182    ) -> Result<ProcessingResult> {
183        let processor = Processor::with_options(
184            keep_temp,
185            encoder,
186            enable_parallel_encoding,
187            max_concurrent_files,
188            quality_preset,
189        )?;
190
191        let result = processor
192            .process_book(book, output_dir, chapter_source)
193            .await?;
194
195        tracing::info!("✓ {}: {:.1}s", book.name, result.processing_time);
196        Ok(result)
197    }
198
199    /// Get recommended worker count based on system
200    pub fn recommended_workers() -> usize {
201        let cpu_count = num_cpus::get();
202
203        // Use 50% of CPU cores for parallel processing
204        // (reserves cores for FFmpeg itself which is multi-threaded)
205        (cpu_count / 2).max(1).min(8)
206    }
207}
208
209impl Default for BatchProcessor {
210    fn default() -> Self {
211        Self::new(Self::recommended_workers())
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn test_batch_processor_creation() {
221        let processor = BatchProcessor::new(4);
222        assert_eq!(processor.workers, 4);
223        assert_eq!(processor.max_concurrent_encodes, 2);
224        assert!(!processor.keep_temp);
225        // Encoder is auto-detected, just verify it's one of the valid options
226        assert!(matches!(processor.encoder, AacEncoder::AppleSilicon | AacEncoder::LibFdk | AacEncoder::Native));
227    }
228
229    #[test]
230    fn test_batch_processor_with_options() {
231        let processor = BatchProcessor::with_options(8, true, AacEncoder::AppleSilicon, true, 4, 8, None, RetryConfig::new());
232        assert_eq!(processor.workers, 8);
233        assert_eq!(processor.max_concurrent_encodes, 4);
234        assert_eq!(processor.max_concurrent_files, 8);
235        assert!(processor.keep_temp);
236        assert_eq!(processor.encoder, AacEncoder::AppleSilicon);
237    }
238
239    #[test]
240    fn test_worker_clamping() {
241        // Test lower bound
242        let processor = BatchProcessor::new(0);
243        assert_eq!(processor.workers, 1);
244
245        // Test upper bound
246        let processor = BatchProcessor::new(100);
247        assert_eq!(processor.workers, 16);
248    }
249
250    #[test]
251    fn test_concurrent_encode_clamping() {
252        let processor = BatchProcessor::with_options(4, false, AacEncoder::Native, true, 0, 8, None, RetryConfig::new());
253        assert_eq!(processor.max_concurrent_encodes, 1);
254
255        let processor = BatchProcessor::with_options(4, false, AacEncoder::Native, true, 100, 8, None, RetryConfig::new());
256        assert_eq!(processor.max_concurrent_encodes, 16);
257    }
258
259    #[test]
260    fn test_recommended_workers() {
261        let workers = BatchProcessor::recommended_workers();
262        assert!(workers >= 1);
263        assert!(workers <= 8);
264    }
265
266    #[tokio::test]
267    async fn test_empty_batch() {
268        let processor = BatchProcessor::new(4);
269        let results = processor
270            .process_batch(Vec::new(), Path::new("/tmp"), "auto")
271            .await;
272        assert_eq!(results.len(), 0);
273    }
274}