1use crate::audio::AacEncoder;
4use crate::core::{Processor, RetryConfig, smart_retry_async};
5use crate::models::{BookFolder, ProcessingResult};
6use anyhow::Result;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::sync::{mpsc, Semaphore};
10
11pub struct BatchProcessor {
13 workers: usize,
15 keep_temp: bool,
17 encoder: AacEncoder,
19 enable_parallel_encoding: bool,
21 max_concurrent_encodes: usize,
23 max_concurrent_files: usize,
25 retry_config: RetryConfig,
27}
28
29impl BatchProcessor {
30 pub fn new(workers: usize) -> Self {
32 Self {
33 workers: workers.clamp(1, 16),
34 keep_temp: false,
35 encoder: crate::audio::get_encoder(),
36 enable_parallel_encoding: true,
37 max_concurrent_encodes: 2, max_concurrent_files: 8, retry_config: RetryConfig::new(),
40 }
41 }
42
43 pub fn with_options(
45 workers: usize,
46 keep_temp: bool,
47 encoder: AacEncoder,
48 enable_parallel_encoding: bool,
49 max_concurrent_encodes: usize,
50 max_concurrent_files: usize,
51 retry_config: RetryConfig,
52 ) -> Self {
53 Self {
54 workers: workers.clamp(1, 16),
55 keep_temp,
56 encoder,
57 enable_parallel_encoding,
58 max_concurrent_encodes: max_concurrent_encodes.clamp(1, 16),
59 max_concurrent_files: max_concurrent_files.clamp(1, 32),
60 retry_config,
61 }
62 }
63
64 pub async fn process_batch(
66 &self,
67 books: Vec<BookFolder>,
68 output_dir: &Path,
69 chapter_source: &str,
70 ) -> Vec<ProcessingResult> {
71 let total_books = books.len();
72
73 if total_books == 0 {
74 return Vec::new();
75 }
76
77 tracing::info!(
78 "Starting batch processing: {} books with {} workers (max {} concurrent encodes)",
79 total_books,
80 self.workers,
81 self.max_concurrent_encodes
82 );
83
84 let encode_semaphore = Arc::new(Semaphore::new(self.max_concurrent_encodes));
86
87 let (result_tx, mut result_rx) = mpsc::channel(total_books);
89
90 let mut handles = Vec::new();
92
93 for (index, book) in books.into_iter().enumerate() {
94 let result_tx = result_tx.clone();
95 let output_dir = output_dir.to_path_buf();
96 let chapter_source = chapter_source.to_string();
97 let keep_temp = self.keep_temp;
98 let encoder = self.encoder;
99 let enable_parallel_encoding = self.enable_parallel_encoding;
100 let max_concurrent_files = self.max_concurrent_files;
101 let encode_semaphore = Arc::clone(&encode_semaphore);
102 let retry_config = self.retry_config.clone();
103
104 let handle = tokio::spawn(async move {
105 let _permit = encode_semaphore.acquire().await.unwrap();
107
108 tracing::info!(
109 "[{}/{}] Processing: {}",
110 index + 1,
111 total_books,
112 book.name
113 );
114
115 let result = smart_retry_async(&retry_config, || {
117 Self::process_single_book(
118 &book,
119 &output_dir,
120 &chapter_source,
121 keep_temp,
122 encoder,
123 enable_parallel_encoding,
124 max_concurrent_files,
125 )
126 })
127 .await
128 .unwrap_or_else(|e| {
129 tracing::error!("✗ {}: {:?}", book.name, e);
131 ProcessingResult::new(book.name.clone())
132 .failure(format!("All retries failed: {:?}", e), 0.0)
133 });
134
135 let _ = result_tx.send(result).await;
137 });
138
139 handles.push(handle);
140 }
141
142 drop(result_tx);
144
145 let mut results = Vec::new();
147 while let Some(result) = result_rx.recv().await {
148 results.push(result);
149 }
150
151 for handle in handles {
153 let _ = handle.await;
154 }
155
156 tracing::info!(
157 "Batch processing complete: {}/{} successful",
158 results.iter().filter(|r| r.success).count(),
159 results.len()
160 );
161
162 results
163 }
164
165 async fn process_single_book(
167 book: &BookFolder,
168 output_dir: &Path,
169 chapter_source: &str,
170 keep_temp: bool,
171 encoder: AacEncoder,
172 enable_parallel_encoding: bool,
173 max_concurrent_files: usize,
174 ) -> Result<ProcessingResult> {
175 let processor = Processor::with_options(
176 keep_temp,
177 encoder,
178 enable_parallel_encoding,
179 max_concurrent_files,
180 )?;
181
182 let result = processor
183 .process_book(book, output_dir, chapter_source)
184 .await?;
185
186 tracing::info!("✓ {}: {:.1}s", book.name, result.processing_time);
187 Ok(result)
188 }
189
190 pub fn recommended_workers() -> usize {
192 let cpu_count = num_cpus::get();
193
194 (cpu_count / 2).max(1).min(8)
197 }
198}
199
200impl Default for BatchProcessor {
201 fn default() -> Self {
202 Self::new(Self::recommended_workers())
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209
210 #[test]
211 fn test_batch_processor_creation() {
212 let processor = BatchProcessor::new(4);
213 assert_eq!(processor.workers, 4);
214 assert_eq!(processor.max_concurrent_encodes, 2);
215 assert!(!processor.keep_temp);
216 assert!(matches!(processor.encoder, AacEncoder::AppleSilicon | AacEncoder::LibFdk | AacEncoder::Native));
218 }
219
220 #[test]
221 fn test_batch_processor_with_options() {
222 let processor = BatchProcessor::with_options(8, true, AacEncoder::AppleSilicon, true, 4, 8, RetryConfig::new());
223 assert_eq!(processor.workers, 8);
224 assert_eq!(processor.max_concurrent_encodes, 4);
225 assert_eq!(processor.max_concurrent_files, 8);
226 assert!(processor.keep_temp);
227 assert_eq!(processor.encoder, AacEncoder::AppleSilicon);
228 }
229
230 #[test]
231 fn test_worker_clamping() {
232 let processor = BatchProcessor::new(0);
234 assert_eq!(processor.workers, 1);
235
236 let processor = BatchProcessor::new(100);
238 assert_eq!(processor.workers, 16);
239 }
240
241 #[test]
242 fn test_concurrent_encode_clamping() {
243 let processor = BatchProcessor::with_options(4, false, AacEncoder::Native, true, 0, 8, RetryConfig::new());
244 assert_eq!(processor.max_concurrent_encodes, 1);
245
246 let processor = BatchProcessor::with_options(4, false, AacEncoder::Native, true, 100, 8, RetryConfig::new());
247 assert_eq!(processor.max_concurrent_encodes, 16);
248 }
249
250 #[test]
251 fn test_recommended_workers() {
252 let workers = BatchProcessor::recommended_workers();
253 assert!(workers >= 1);
254 assert!(workers <= 8);
255 }
256
257 #[tokio::test]
258 async fn test_empty_batch() {
259 let processor = BatchProcessor::new(4);
260 let results = processor
261 .process_batch(Vec::new(), Path::new("/tmp"), "auto")
262 .await;
263 assert_eq!(results.len(), 0);
264 }
265}