1use crate::core::{Processor, RetryConfig, smart_retry_async};
4use crate::models::{BookFolder, ProcessingResult};
5use anyhow::Result;
6use std::path::Path;
7use std::sync::Arc;
8use tokio::sync::{mpsc, Semaphore};
9
10pub struct BatchProcessor {
12 workers: usize,
14 keep_temp: bool,
16 use_apple_silicon: bool,
18 enable_parallel_encoding: bool,
20 max_concurrent_encodes: usize,
22 retry_config: RetryConfig,
24}
25
26impl BatchProcessor {
27 pub fn new(workers: usize) -> Self {
29 Self {
30 workers: workers.clamp(1, 16),
31 keep_temp: false,
32 use_apple_silicon: false,
33 enable_parallel_encoding: true,
34 max_concurrent_encodes: 2, retry_config: RetryConfig::new(),
36 }
37 }
38
39 pub fn with_options(
41 workers: usize,
42 keep_temp: bool,
43 use_apple_silicon: bool,
44 enable_parallel_encoding: bool,
45 max_concurrent_encodes: usize,
46 retry_config: RetryConfig,
47 ) -> Self {
48 Self {
49 workers: workers.clamp(1, 16),
50 keep_temp,
51 use_apple_silicon,
52 enable_parallel_encoding,
53 max_concurrent_encodes: max_concurrent_encodes.clamp(1, 16),
54 retry_config,
55 }
56 }
57
58 pub async fn process_batch(
60 &self,
61 books: Vec<BookFolder>,
62 output_dir: &Path,
63 chapter_source: &str,
64 ) -> Vec<ProcessingResult> {
65 let total_books = books.len();
66
67 if total_books == 0 {
68 return Vec::new();
69 }
70
71 tracing::info!(
72 "Starting batch processing: {} books with {} workers (max {} concurrent encodes)",
73 total_books,
74 self.workers,
75 self.max_concurrent_encodes
76 );
77
78 let encode_semaphore = Arc::new(Semaphore::new(self.max_concurrent_encodes));
80
81 let (result_tx, mut result_rx) = mpsc::channel(total_books);
83
84 let mut handles = Vec::new();
86
87 for (index, book) in books.into_iter().enumerate() {
88 let result_tx = result_tx.clone();
89 let output_dir = output_dir.to_path_buf();
90 let chapter_source = chapter_source.to_string();
91 let keep_temp = self.keep_temp;
92 let use_apple_silicon = self.use_apple_silicon;
93 let enable_parallel_encoding = self.enable_parallel_encoding;
94 let encode_semaphore = Arc::clone(&encode_semaphore);
95 let retry_config = self.retry_config.clone();
96
97 let handle = tokio::spawn(async move {
98 let _permit = encode_semaphore.acquire().await.unwrap();
100
101 tracing::info!(
102 "[{}/{}] Processing: {}",
103 index + 1,
104 total_books,
105 book.name
106 );
107
108 let result = smart_retry_async(&retry_config, || {
110 Self::process_single_book(
111 &book,
112 &output_dir,
113 &chapter_source,
114 keep_temp,
115 use_apple_silicon,
116 enable_parallel_encoding,
117 )
118 })
119 .await
120 .unwrap_or_else(|e| {
121 tracing::error!("✗ {}: {}", book.name, e);
123 ProcessingResult::new(book.name.clone())
124 .failure(format!("All retries failed: {}", e), 0.0)
125 });
126
127 let _ = result_tx.send(result).await;
129 });
130
131 handles.push(handle);
132 }
133
134 drop(result_tx);
136
137 let mut results = Vec::new();
139 while let Some(result) = result_rx.recv().await {
140 results.push(result);
141 }
142
143 for handle in handles {
145 let _ = handle.await;
146 }
147
148 tracing::info!(
149 "Batch processing complete: {}/{} successful",
150 results.iter().filter(|r| r.success).count(),
151 results.len()
152 );
153
154 results
155 }
156
157 async fn process_single_book(
159 book: &BookFolder,
160 output_dir: &Path,
161 chapter_source: &str,
162 keep_temp: bool,
163 use_apple_silicon: bool,
164 enable_parallel_encoding: bool,
165 ) -> Result<ProcessingResult> {
166 let processor = Processor::with_options(keep_temp, use_apple_silicon, enable_parallel_encoding)?;
167
168 let result = processor
169 .process_book(book, output_dir, chapter_source)
170 .await?;
171
172 tracing::info!("✓ {}: {:.1}s", book.name, result.processing_time);
173 Ok(result)
174 }
175
176 pub fn recommended_workers() -> usize {
178 let cpu_count = num_cpus::get();
179
180 (cpu_count / 2).max(1).min(8)
183 }
184}
185
186impl Default for BatchProcessor {
187 fn default() -> Self {
188 Self::new(Self::recommended_workers())
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn test_batch_processor_creation() {
198 let processor = BatchProcessor::new(4);
199 assert_eq!(processor.workers, 4);
200 assert_eq!(processor.max_concurrent_encodes, 2);
201 assert!(!processor.keep_temp);
202 assert!(!processor.use_apple_silicon);
203 }
204
205 #[test]
206 fn test_batch_processor_with_options() {
207 let processor = BatchProcessor::with_options(8, true, true, true, 4, RetryConfig::new());
208 assert_eq!(processor.workers, 8);
209 assert_eq!(processor.max_concurrent_encodes, 4);
210 assert!(processor.keep_temp);
211 assert!(processor.use_apple_silicon);
212 }
213
214 #[test]
215 fn test_worker_clamping() {
216 let processor = BatchProcessor::new(0);
218 assert_eq!(processor.workers, 1);
219
220 let processor = BatchProcessor::new(100);
222 assert_eq!(processor.workers, 16);
223 }
224
225 #[test]
226 fn test_concurrent_encode_clamping() {
227 let processor = BatchProcessor::with_options(4, false, false, true, 0, RetryConfig::new());
228 assert_eq!(processor.max_concurrent_encodes, 1);
229
230 let processor = BatchProcessor::with_options(4, false, false, true, 100, RetryConfig::new());
231 assert_eq!(processor.max_concurrent_encodes, 16);
232 }
233
234 #[test]
235 fn test_recommended_workers() {
236 let workers = BatchProcessor::recommended_workers();
237 assert!(workers >= 1);
238 assert!(workers <= 8);
239 }
240
241 #[tokio::test]
242 async fn test_empty_batch() {
243 let processor = BatchProcessor::new(4);
244 let results = processor
245 .process_batch(Vec::new(), Path::new("/tmp"), "auto")
246 .await;
247 assert_eq!(results.len(), 0);
248 }
249}