1use crate::core::{Processor, RetryConfig, smart_retry_async};
4use crate::models::{BookFolder, ProcessingResult};
5use anyhow::Result;
6use std::path::Path;
7use std::sync::Arc;
8use tokio::sync::{mpsc, Semaphore};
9
10pub struct BatchProcessor {
12 workers: usize,
14 keep_temp: bool,
16 use_apple_silicon: bool,
18 enable_parallel_encoding: bool,
20 max_concurrent_encodes: usize,
22 max_concurrent_files: usize,
24 retry_config: RetryConfig,
26}
27
28impl BatchProcessor {
29 pub fn new(workers: usize) -> Self {
31 Self {
32 workers: workers.clamp(1, 16),
33 keep_temp: false,
34 use_apple_silicon: false,
35 enable_parallel_encoding: true,
36 max_concurrent_encodes: 2, max_concurrent_files: 8, retry_config: RetryConfig::new(),
39 }
40 }
41
42 pub fn with_options(
44 workers: usize,
45 keep_temp: bool,
46 use_apple_silicon: bool,
47 enable_parallel_encoding: bool,
48 max_concurrent_encodes: usize,
49 max_concurrent_files: usize,
50 retry_config: RetryConfig,
51 ) -> Self {
52 Self {
53 workers: workers.clamp(1, 16),
54 keep_temp,
55 use_apple_silicon,
56 enable_parallel_encoding,
57 max_concurrent_encodes: max_concurrent_encodes.clamp(1, 16),
58 max_concurrent_files: max_concurrent_files.clamp(1, 32),
59 retry_config,
60 }
61 }
62
63 pub async fn process_batch(
65 &self,
66 books: Vec<BookFolder>,
67 output_dir: &Path,
68 chapter_source: &str,
69 ) -> Vec<ProcessingResult> {
70 let total_books = books.len();
71
72 if total_books == 0 {
73 return Vec::new();
74 }
75
76 tracing::info!(
77 "Starting batch processing: {} books with {} workers (max {} concurrent encodes)",
78 total_books,
79 self.workers,
80 self.max_concurrent_encodes
81 );
82
83 let encode_semaphore = Arc::new(Semaphore::new(self.max_concurrent_encodes));
85
86 let (result_tx, mut result_rx) = mpsc::channel(total_books);
88
89 let mut handles = Vec::new();
91
92 for (index, book) in books.into_iter().enumerate() {
93 let result_tx = result_tx.clone();
94 let output_dir = output_dir.to_path_buf();
95 let chapter_source = chapter_source.to_string();
96 let keep_temp = self.keep_temp;
97 let use_apple_silicon = self.use_apple_silicon;
98 let enable_parallel_encoding = self.enable_parallel_encoding;
99 let max_concurrent_files = self.max_concurrent_files;
100 let encode_semaphore = Arc::clone(&encode_semaphore);
101 let retry_config = self.retry_config.clone();
102
103 let handle = tokio::spawn(async move {
104 let _permit = encode_semaphore.acquire().await.unwrap();
106
107 tracing::info!(
108 "[{}/{}] Processing: {}",
109 index + 1,
110 total_books,
111 book.name
112 );
113
114 let result = smart_retry_async(&retry_config, || {
116 Self::process_single_book(
117 &book,
118 &output_dir,
119 &chapter_source,
120 keep_temp,
121 use_apple_silicon,
122 enable_parallel_encoding,
123 max_concurrent_files,
124 )
125 })
126 .await
127 .unwrap_or_else(|e| {
128 tracing::error!("✗ {}: {}", book.name, e);
130 ProcessingResult::new(book.name.clone())
131 .failure(format!("All retries failed: {}", e), 0.0)
132 });
133
134 let _ = result_tx.send(result).await;
136 });
137
138 handles.push(handle);
139 }
140
141 drop(result_tx);
143
144 let mut results = Vec::new();
146 while let Some(result) = result_rx.recv().await {
147 results.push(result);
148 }
149
150 for handle in handles {
152 let _ = handle.await;
153 }
154
155 tracing::info!(
156 "Batch processing complete: {}/{} successful",
157 results.iter().filter(|r| r.success).count(),
158 results.len()
159 );
160
161 results
162 }
163
164 async fn process_single_book(
166 book: &BookFolder,
167 output_dir: &Path,
168 chapter_source: &str,
169 keep_temp: bool,
170 use_apple_silicon: bool,
171 enable_parallel_encoding: bool,
172 max_concurrent_files: usize,
173 ) -> Result<ProcessingResult> {
174 let processor = Processor::with_options(
175 keep_temp,
176 use_apple_silicon,
177 enable_parallel_encoding,
178 max_concurrent_files,
179 )?;
180
181 let result = processor
182 .process_book(book, output_dir, chapter_source)
183 .await?;
184
185 tracing::info!("✓ {}: {:.1}s", book.name, result.processing_time);
186 Ok(result)
187 }
188
189 pub fn recommended_workers() -> usize {
191 let cpu_count = num_cpus::get();
192
193 (cpu_count / 2).max(1).min(8)
196 }
197}
198
199impl Default for BatchProcessor {
200 fn default() -> Self {
201 Self::new(Self::recommended_workers())
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 #[test]
210 fn test_batch_processor_creation() {
211 let processor = BatchProcessor::new(4);
212 assert_eq!(processor.workers, 4);
213 assert_eq!(processor.max_concurrent_encodes, 2);
214 assert!(!processor.keep_temp);
215 assert!(!processor.use_apple_silicon);
216 }
217
218 #[test]
219 fn test_batch_processor_with_options() {
220 let processor = BatchProcessor::with_options(8, true, true, true, 4, 8, RetryConfig::new());
221 assert_eq!(processor.workers, 8);
222 assert_eq!(processor.max_concurrent_encodes, 4);
223 assert_eq!(processor.max_concurrent_files, 8);
224 assert!(processor.keep_temp);
225 assert!(processor.use_apple_silicon);
226 }
227
228 #[test]
229 fn test_worker_clamping() {
230 let processor = BatchProcessor::new(0);
232 assert_eq!(processor.workers, 1);
233
234 let processor = BatchProcessor::new(100);
236 assert_eq!(processor.workers, 16);
237 }
238
239 #[test]
240 fn test_concurrent_encode_clamping() {
241 let processor = BatchProcessor::with_options(4, false, false, true, 0, 8, RetryConfig::new());
242 assert_eq!(processor.max_concurrent_encodes, 1);
243
244 let processor = BatchProcessor::with_options(4, false, false, true, 100, 8, RetryConfig::new());
245 assert_eq!(processor.max_concurrent_encodes, 16);
246 }
247
248 #[test]
249 fn test_recommended_workers() {
250 let workers = BatchProcessor::recommended_workers();
251 assert!(workers >= 1);
252 assert!(workers <= 8);
253 }
254
255 #[tokio::test]
256 async fn test_empty_batch() {
257 let processor = BatchProcessor::new(4);
258 let results = processor
259 .process_batch(Vec::new(), Path::new("/tmp"), "auto")
260 .await;
261 assert_eq!(results.len(), 0);
262 }
263}