audiobook_forge/core/
batch.rs

1//! Batch processor for parallel audiobook processing
2
3use crate::core::{Processor, RetryConfig, smart_retry_async};
4use crate::models::{BookFolder, ProcessingResult};
5use anyhow::Result;
6use std::path::Path;
7use std::sync::Arc;
8use tokio::sync::{mpsc, Semaphore};
9
10/// Batch processor for converting multiple audiobooks in parallel
11pub struct BatchProcessor {
12    /// Number of parallel workers
13    workers: usize,
14    /// Keep temporary files for debugging
15    keep_temp: bool,
16    /// Use Apple Silicon hardware encoder
17    use_apple_silicon: bool,
18    /// Enable parallel file encoding
19    enable_parallel_encoding: bool,
20    /// Maximum concurrent encoding operations (to limit CPU usage)
21    max_concurrent_encodes: usize,
22    /// Maximum concurrent file encodings per book
23    max_concurrent_files: usize,
24    /// Retry configuration
25    retry_config: RetryConfig,
26}
27
28impl BatchProcessor {
29    /// Create a new batch processor with default settings
30    pub fn new(workers: usize) -> Self {
31        Self {
32            workers: workers.clamp(1, 16),
33            keep_temp: false,
34            use_apple_silicon: false,
35            enable_parallel_encoding: true,
36            max_concurrent_encodes: 2, // Default: 2 concurrent encodes
37            max_concurrent_files: 8, // Default: 8 concurrent files per book
38            retry_config: RetryConfig::new(),
39        }
40    }
41
42    /// Create batch processor with custom options
43    pub fn with_options(
44        workers: usize,
45        keep_temp: bool,
46        use_apple_silicon: bool,
47        enable_parallel_encoding: bool,
48        max_concurrent_encodes: usize,
49        max_concurrent_files: usize,
50        retry_config: RetryConfig,
51    ) -> Self {
52        Self {
53            workers: workers.clamp(1, 16),
54            keep_temp,
55            use_apple_silicon,
56            enable_parallel_encoding,
57            max_concurrent_encodes: max_concurrent_encodes.clamp(1, 16),
58            max_concurrent_files: max_concurrent_files.clamp(1, 32),
59            retry_config,
60        }
61    }
62
63    /// Process multiple books in parallel
64    pub async fn process_batch(
65        &self,
66        books: Vec<BookFolder>,
67        output_dir: &Path,
68        chapter_source: &str,
69    ) -> Vec<ProcessingResult> {
70        let total_books = books.len();
71
72        if total_books == 0 {
73            return Vec::new();
74        }
75
76        tracing::info!(
77            "Starting batch processing: {} books with {} workers (max {} concurrent encodes)",
78            total_books,
79            self.workers,
80            self.max_concurrent_encodes
81        );
82
83        // Create a semaphore to limit concurrent encoding operations
84        let encode_semaphore = Arc::new(Semaphore::new(self.max_concurrent_encodes));
85
86        // Create channel for collecting results
87        let (result_tx, mut result_rx) = mpsc::channel(total_books);
88
89        // Spawn tasks for each book
90        let mut handles = Vec::new();
91
92        for (index, book) in books.into_iter().enumerate() {
93            let result_tx = result_tx.clone();
94            let output_dir = output_dir.to_path_buf();
95            let chapter_source = chapter_source.to_string();
96            let keep_temp = self.keep_temp;
97            let use_apple_silicon = self.use_apple_silicon;
98            let enable_parallel_encoding = self.enable_parallel_encoding;
99            let max_concurrent_files = self.max_concurrent_files;
100            let encode_semaphore = Arc::clone(&encode_semaphore);
101            let retry_config = self.retry_config.clone();
102
103            let handle = tokio::spawn(async move {
104                // Acquire semaphore permit before encoding (limits concurrent encodes)
105                let _permit = encode_semaphore.acquire().await.unwrap();
106
107                tracing::info!(
108                    "[{}/{}] Processing: {}",
109                    index + 1,
110                    total_books,
111                    book.name
112                );
113
114                // Process with retry logic
115                let result = smart_retry_async(&retry_config, || {
116                    Self::process_single_book(
117                        &book,
118                        &output_dir,
119                        &chapter_source,
120                        keep_temp,
121                        use_apple_silicon,
122                        enable_parallel_encoding,
123                        max_concurrent_files,
124                    )
125                })
126                .await
127                .unwrap_or_else(|e| {
128                    // If all retries fail, return a failure result
129                    tracing::error!("✗ {}: {}", book.name, e);
130                    ProcessingResult::new(book.name.clone())
131                        .failure(format!("All retries failed: {}", e), 0.0)
132                });
133
134                // Send result through channel
135                let _ = result_tx.send(result).await;
136            });
137
138            handles.push(handle);
139        }
140
141        // Drop the original sender so the receiver knows when all tasks are done
142        drop(result_tx);
143
144        // Collect all results
145        let mut results = Vec::new();
146        while let Some(result) = result_rx.recv().await {
147            results.push(result);
148        }
149
150        // Wait for all tasks to complete
151        for handle in handles {
152            let _ = handle.await;
153        }
154
155        tracing::info!(
156            "Batch processing complete: {}/{} successful",
157            results.iter().filter(|r| r.success).count(),
158            results.len()
159        );
160
161        results
162    }
163
164    /// Process a single book (internal helper)
165    async fn process_single_book(
166        book: &BookFolder,
167        output_dir: &Path,
168        chapter_source: &str,
169        keep_temp: bool,
170        use_apple_silicon: bool,
171        enable_parallel_encoding: bool,
172        max_concurrent_files: usize,
173    ) -> Result<ProcessingResult> {
174        let processor = Processor::with_options(
175            keep_temp,
176            use_apple_silicon,
177            enable_parallel_encoding,
178            max_concurrent_files,
179        )?;
180
181        let result = processor
182            .process_book(book, output_dir, chapter_source)
183            .await?;
184
185        tracing::info!("✓ {}: {:.1}s", book.name, result.processing_time);
186        Ok(result)
187    }
188
189    /// Get recommended worker count based on system
190    pub fn recommended_workers() -> usize {
191        let cpu_count = num_cpus::get();
192
193        // Use 50% of CPU cores for parallel processing
194        // (reserves cores for FFmpeg itself which is multi-threaded)
195        (cpu_count / 2).max(1).min(8)
196    }
197}
198
199impl Default for BatchProcessor {
200    fn default() -> Self {
201        Self::new(Self::recommended_workers())
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn test_batch_processor_creation() {
211        let processor = BatchProcessor::new(4);
212        assert_eq!(processor.workers, 4);
213        assert_eq!(processor.max_concurrent_encodes, 2);
214        assert!(!processor.keep_temp);
215        assert!(!processor.use_apple_silicon);
216    }
217
218    #[test]
219    fn test_batch_processor_with_options() {
220        let processor = BatchProcessor::with_options(8, true, true, true, 4, 8, RetryConfig::new());
221        assert_eq!(processor.workers, 8);
222        assert_eq!(processor.max_concurrent_encodes, 4);
223        assert_eq!(processor.max_concurrent_files, 8);
224        assert!(processor.keep_temp);
225        assert!(processor.use_apple_silicon);
226    }
227
228    #[test]
229    fn test_worker_clamping() {
230        // Test lower bound
231        let processor = BatchProcessor::new(0);
232        assert_eq!(processor.workers, 1);
233
234        // Test upper bound
235        let processor = BatchProcessor::new(100);
236        assert_eq!(processor.workers, 16);
237    }
238
239    #[test]
240    fn test_concurrent_encode_clamping() {
241        let processor = BatchProcessor::with_options(4, false, false, true, 0, 8, RetryConfig::new());
242        assert_eq!(processor.max_concurrent_encodes, 1);
243
244        let processor = BatchProcessor::with_options(4, false, false, true, 100, 8, RetryConfig::new());
245        assert_eq!(processor.max_concurrent_encodes, 16);
246    }
247
248    #[test]
249    fn test_recommended_workers() {
250        let workers = BatchProcessor::recommended_workers();
251        assert!(workers >= 1);
252        assert!(workers <= 8);
253    }
254
255    #[tokio::test]
256    async fn test_empty_batch() {
257        let processor = BatchProcessor::new(4);
258        let results = processor
259            .process_batch(Vec::new(), Path::new("/tmp"), "auto")
260            .await;
261        assert_eq!(results.len(), 0);
262    }
263}