audiobook_forge/core/
batch.rs

1//! Batch processor for parallel audiobook processing
2
3use crate::core::{Processor, RetryConfig, smart_retry_async};
4use crate::models::{BookFolder, ProcessingResult};
5use anyhow::Result;
6use std::path::Path;
7use std::sync::Arc;
8use tokio::sync::{mpsc, Semaphore};
9
10/// Batch processor for converting multiple audiobooks in parallel
11pub struct BatchProcessor {
12    /// Number of parallel workers
13    workers: usize,
14    /// Keep temporary files for debugging
15    keep_temp: bool,
16    /// Use Apple Silicon hardware encoder
17    use_apple_silicon: bool,
18    /// Enable parallel file encoding
19    enable_parallel_encoding: bool,
20    /// Maximum concurrent encoding operations (to limit CPU usage)
21    max_concurrent_encodes: usize,
22    /// Retry configuration
23    retry_config: RetryConfig,
24}
25
26impl BatchProcessor {
27    /// Create a new batch processor with default settings
28    pub fn new(workers: usize) -> Self {
29        Self {
30            workers: workers.clamp(1, 16),
31            keep_temp: false,
32            use_apple_silicon: false,
33            enable_parallel_encoding: true,
34            max_concurrent_encodes: 2, // Default: 2 concurrent encodes
35            retry_config: RetryConfig::new(),
36        }
37    }
38
39    /// Create batch processor with custom options
40    pub fn with_options(
41        workers: usize,
42        keep_temp: bool,
43        use_apple_silicon: bool,
44        enable_parallel_encoding: bool,
45        max_concurrent_encodes: usize,
46        retry_config: RetryConfig,
47    ) -> Self {
48        Self {
49            workers: workers.clamp(1, 16),
50            keep_temp,
51            use_apple_silicon,
52            enable_parallel_encoding,
53            max_concurrent_encodes: max_concurrent_encodes.clamp(1, 16),
54            retry_config,
55        }
56    }
57
58    /// Process multiple books in parallel
59    pub async fn process_batch(
60        &self,
61        books: Vec<BookFolder>,
62        output_dir: &Path,
63        chapter_source: &str,
64    ) -> Vec<ProcessingResult> {
65        let total_books = books.len();
66
67        if total_books == 0 {
68            return Vec::new();
69        }
70
71        tracing::info!(
72            "Starting batch processing: {} books with {} workers (max {} concurrent encodes)",
73            total_books,
74            self.workers,
75            self.max_concurrent_encodes
76        );
77
78        // Create a semaphore to limit concurrent encoding operations
79        let encode_semaphore = Arc::new(Semaphore::new(self.max_concurrent_encodes));
80
81        // Create channel for collecting results
82        let (result_tx, mut result_rx) = mpsc::channel(total_books);
83
84        // Spawn tasks for each book
85        let mut handles = Vec::new();
86
87        for (index, book) in books.into_iter().enumerate() {
88            let result_tx = result_tx.clone();
89            let output_dir = output_dir.to_path_buf();
90            let chapter_source = chapter_source.to_string();
91            let keep_temp = self.keep_temp;
92            let use_apple_silicon = self.use_apple_silicon;
93            let enable_parallel_encoding = self.enable_parallel_encoding;
94            let encode_semaphore = Arc::clone(&encode_semaphore);
95            let retry_config = self.retry_config.clone();
96
97            let handle = tokio::spawn(async move {
98                // Acquire semaphore permit before encoding (limits concurrent encodes)
99                let _permit = encode_semaphore.acquire().await.unwrap();
100
101                tracing::info!(
102                    "[{}/{}] Processing: {}",
103                    index + 1,
104                    total_books,
105                    book.name
106                );
107
108                // Process with retry logic
109                let result = smart_retry_async(&retry_config, || {
110                    Self::process_single_book(
111                        &book,
112                        &output_dir,
113                        &chapter_source,
114                        keep_temp,
115                        use_apple_silicon,
116                        enable_parallel_encoding,
117                    )
118                })
119                .await
120                .unwrap_or_else(|e| {
121                    // If all retries fail, return a failure result
122                    tracing::error!("✗ {}: {}", book.name, e);
123                    ProcessingResult::new(book.name.clone())
124                        .failure(format!("All retries failed: {}", e), 0.0)
125                });
126
127                // Send result through channel
128                let _ = result_tx.send(result).await;
129            });
130
131            handles.push(handle);
132        }
133
134        // Drop the original sender so the receiver knows when all tasks are done
135        drop(result_tx);
136
137        // Collect all results
138        let mut results = Vec::new();
139        while let Some(result) = result_rx.recv().await {
140            results.push(result);
141        }
142
143        // Wait for all tasks to complete
144        for handle in handles {
145            let _ = handle.await;
146        }
147
148        tracing::info!(
149            "Batch processing complete: {}/{} successful",
150            results.iter().filter(|r| r.success).count(),
151            results.len()
152        );
153
154        results
155    }
156
157    /// Process a single book (internal helper)
158    async fn process_single_book(
159        book: &BookFolder,
160        output_dir: &Path,
161        chapter_source: &str,
162        keep_temp: bool,
163        use_apple_silicon: bool,
164        enable_parallel_encoding: bool,
165    ) -> Result<ProcessingResult> {
166        let processor = Processor::with_options(keep_temp, use_apple_silicon, enable_parallel_encoding)?;
167
168        let result = processor
169            .process_book(book, output_dir, chapter_source)
170            .await?;
171
172        tracing::info!("✓ {}: {:.1}s", book.name, result.processing_time);
173        Ok(result)
174    }
175
176    /// Get recommended worker count based on system
177    pub fn recommended_workers() -> usize {
178        let cpu_count = num_cpus::get();
179
180        // Use 50% of CPU cores for parallel processing
181        // (reserves cores for FFmpeg itself which is multi-threaded)
182        (cpu_count / 2).max(1).min(8)
183    }
184}
185
186impl Default for BatchProcessor {
187    fn default() -> Self {
188        Self::new(Self::recommended_workers())
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn test_batch_processor_creation() {
198        let processor = BatchProcessor::new(4);
199        assert_eq!(processor.workers, 4);
200        assert_eq!(processor.max_concurrent_encodes, 2);
201        assert!(!processor.keep_temp);
202        assert!(!processor.use_apple_silicon);
203    }
204
205    #[test]
206    fn test_batch_processor_with_options() {
207        let processor = BatchProcessor::with_options(8, true, true, true, 4, RetryConfig::new());
208        assert_eq!(processor.workers, 8);
209        assert_eq!(processor.max_concurrent_encodes, 4);
210        assert!(processor.keep_temp);
211        assert!(processor.use_apple_silicon);
212    }
213
214    #[test]
215    fn test_worker_clamping() {
216        // Test lower bound
217        let processor = BatchProcessor::new(0);
218        assert_eq!(processor.workers, 1);
219
220        // Test upper bound
221        let processor = BatchProcessor::new(100);
222        assert_eq!(processor.workers, 16);
223    }
224
225    #[test]
226    fn test_concurrent_encode_clamping() {
227        let processor = BatchProcessor::with_options(4, false, false, true, 0, RetryConfig::new());
228        assert_eq!(processor.max_concurrent_encodes, 1);
229
230        let processor = BatchProcessor::with_options(4, false, false, true, 100, RetryConfig::new());
231        assert_eq!(processor.max_concurrent_encodes, 16);
232    }
233
234    #[test]
235    fn test_recommended_workers() {
236        let workers = BatchProcessor::recommended_workers();
237        assert!(workers >= 1);
238        assert!(workers <= 8);
239    }
240
241    #[tokio::test]
242    async fn test_empty_batch() {
243        let processor = BatchProcessor::new(4);
244        let results = processor
245            .process_batch(Vec::new(), Path::new("/tmp"), "auto")
246            .await;
247        assert_eq!(results.len(), 0);
248    }
249}