audiobook_forge/core/
batch.rs

1//! Batch processor for parallel audiobook processing
2
3use crate::audio::AacEncoder;
4use crate::core::{Processor, RetryConfig, smart_retry_async};
5use crate::models::{BookFolder, ProcessingResult};
6use anyhow::Result;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::sync::{mpsc, Semaphore};
10
11/// Batch processor for converting multiple audiobooks in parallel
12pub struct BatchProcessor {
13    /// Number of parallel workers
14    workers: usize,
15    /// Keep temporary files for debugging
16    keep_temp: bool,
17    /// AAC encoder to use
18    encoder: AacEncoder,
19    /// Enable parallel file encoding
20    enable_parallel_encoding: bool,
21    /// Maximum concurrent encoding operations (to limit CPU usage)
22    max_concurrent_encodes: usize,
23    /// Maximum concurrent file encodings per book
24    max_concurrent_files: usize,
25    /// Retry configuration
26    retry_config: RetryConfig,
27}
28
29impl BatchProcessor {
30    /// Create a new batch processor with default settings
31    pub fn new(workers: usize) -> Self {
32        Self {
33            workers: workers.clamp(1, 16),
34            keep_temp: false,
35            encoder: crate::audio::get_encoder(),
36            enable_parallel_encoding: true,
37            max_concurrent_encodes: 2, // Default: 2 concurrent encodes
38            max_concurrent_files: 8, // Default: 8 concurrent files per book
39            retry_config: RetryConfig::new(),
40        }
41    }
42
43    /// Create batch processor with custom options
44    pub fn with_options(
45        workers: usize,
46        keep_temp: bool,
47        encoder: AacEncoder,
48        enable_parallel_encoding: bool,
49        max_concurrent_encodes: usize,
50        max_concurrent_files: usize,
51        retry_config: RetryConfig,
52    ) -> Self {
53        Self {
54            workers: workers.clamp(1, 16),
55            keep_temp,
56            encoder,
57            enable_parallel_encoding,
58            max_concurrent_encodes: max_concurrent_encodes.clamp(1, 16),
59            max_concurrent_files: max_concurrent_files.clamp(1, 32),
60            retry_config,
61        }
62    }
63
64    /// Process multiple books in parallel
65    pub async fn process_batch(
66        &self,
67        books: Vec<BookFolder>,
68        output_dir: &Path,
69        chapter_source: &str,
70    ) -> Vec<ProcessingResult> {
71        let total_books = books.len();
72
73        if total_books == 0 {
74            return Vec::new();
75        }
76
77        tracing::info!(
78            "Starting batch processing: {} books with {} workers (max {} concurrent encodes)",
79            total_books,
80            self.workers,
81            self.max_concurrent_encodes
82        );
83
84        // Create a semaphore to limit concurrent encoding operations
85        let encode_semaphore = Arc::new(Semaphore::new(self.max_concurrent_encodes));
86
87        // Create channel for collecting results
88        let (result_tx, mut result_rx) = mpsc::channel(total_books);
89
90        // Spawn tasks for each book
91        let mut handles = Vec::new();
92
93        for (index, book) in books.into_iter().enumerate() {
94            let result_tx = result_tx.clone();
95            let output_dir = output_dir.to_path_buf();
96            let chapter_source = chapter_source.to_string();
97            let keep_temp = self.keep_temp;
98            let encoder = self.encoder;
99            let enable_parallel_encoding = self.enable_parallel_encoding;
100            let max_concurrent_files = self.max_concurrent_files;
101            let encode_semaphore = Arc::clone(&encode_semaphore);
102            let retry_config = self.retry_config.clone();
103
104            let handle = tokio::spawn(async move {
105                // Acquire semaphore permit before encoding (limits concurrent encodes)
106                let _permit = encode_semaphore.acquire().await.unwrap();
107
108                tracing::info!(
109                    "[{}/{}] Processing: {}",
110                    index + 1,
111                    total_books,
112                    book.name
113                );
114
115                // Process with retry logic
116                let result = smart_retry_async(&retry_config, || {
117                    Self::process_single_book(
118                        &book,
119                        &output_dir,
120                        &chapter_source,
121                        keep_temp,
122                        encoder,
123                        enable_parallel_encoding,
124                        max_concurrent_files,
125                    )
126                })
127                .await
128                .unwrap_or_else(|e| {
129                    // If all retries fail, return a failure result
130                    tracing::error!("✗ {}: {:?}", book.name, e);
131                    ProcessingResult::new(book.name.clone())
132                        .failure(format!("All retries failed: {:?}", e), 0.0)
133                });
134
135                // Send result through channel
136                let _ = result_tx.send(result).await;
137            });
138
139            handles.push(handle);
140        }
141
142        // Drop the original sender so the receiver knows when all tasks are done
143        drop(result_tx);
144
145        // Collect all results
146        let mut results = Vec::new();
147        while let Some(result) = result_rx.recv().await {
148            results.push(result);
149        }
150
151        // Wait for all tasks to complete
152        for handle in handles {
153            let _ = handle.await;
154        }
155
156        tracing::info!(
157            "Batch processing complete: {}/{} successful",
158            results.iter().filter(|r| r.success).count(),
159            results.len()
160        );
161
162        results
163    }
164
165    /// Process a single book (internal helper)
166    async fn process_single_book(
167        book: &BookFolder,
168        output_dir: &Path,
169        chapter_source: &str,
170        keep_temp: bool,
171        encoder: AacEncoder,
172        enable_parallel_encoding: bool,
173        max_concurrent_files: usize,
174    ) -> Result<ProcessingResult> {
175        let processor = Processor::with_options(
176            keep_temp,
177            encoder,
178            enable_parallel_encoding,
179            max_concurrent_files,
180        )?;
181
182        let result = processor
183            .process_book(book, output_dir, chapter_source)
184            .await?;
185
186        tracing::info!("✓ {}: {:.1}s", book.name, result.processing_time);
187        Ok(result)
188    }
189
190    /// Get recommended worker count based on system
191    pub fn recommended_workers() -> usize {
192        let cpu_count = num_cpus::get();
193
194        // Use 50% of CPU cores for parallel processing
195        // (reserves cores for FFmpeg itself which is multi-threaded)
196        (cpu_count / 2).max(1).min(8)
197    }
198}
199
200impl Default for BatchProcessor {
201    fn default() -> Self {
202        Self::new(Self::recommended_workers())
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    #[test]
211    fn test_batch_processor_creation() {
212        let processor = BatchProcessor::new(4);
213        assert_eq!(processor.workers, 4);
214        assert_eq!(processor.max_concurrent_encodes, 2);
215        assert!(!processor.keep_temp);
216        // Encoder is auto-detected, just verify it's one of the valid options
217        assert!(matches!(processor.encoder, AacEncoder::AppleSilicon | AacEncoder::LibFdk | AacEncoder::Native));
218    }
219
220    #[test]
221    fn test_batch_processor_with_options() {
222        let processor = BatchProcessor::with_options(8, true, AacEncoder::AppleSilicon, true, 4, 8, RetryConfig::new());
223        assert_eq!(processor.workers, 8);
224        assert_eq!(processor.max_concurrent_encodes, 4);
225        assert_eq!(processor.max_concurrent_files, 8);
226        assert!(processor.keep_temp);
227        assert_eq!(processor.encoder, AacEncoder::AppleSilicon);
228    }
229
230    #[test]
231    fn test_worker_clamping() {
232        // Test lower bound
233        let processor = BatchProcessor::new(0);
234        assert_eq!(processor.workers, 1);
235
236        // Test upper bound
237        let processor = BatchProcessor::new(100);
238        assert_eq!(processor.workers, 16);
239    }
240
241    #[test]
242    fn test_concurrent_encode_clamping() {
243        let processor = BatchProcessor::with_options(4, false, AacEncoder::Native, true, 0, 8, RetryConfig::new());
244        assert_eq!(processor.max_concurrent_encodes, 1);
245
246        let processor = BatchProcessor::with_options(4, false, AacEncoder::Native, true, 100, 8, RetryConfig::new());
247        assert_eq!(processor.max_concurrent_encodes, 16);
248    }
249
250    #[test]
251    fn test_recommended_workers() {
252        let workers = BatchProcessor::recommended_workers();
253        assert!(workers >= 1);
254        assert!(workers <= 8);
255    }
256
257    #[tokio::test]
258    async fn test_empty_batch() {
259        let processor = BatchProcessor::new(4);
260        let results = processor
261            .process_batch(Vec::new(), Path::new("/tmp"), "auto")
262            .await;
263        assert_eq!(results.len(), 0);
264    }
265}