1use crate::audio::AacEncoder;
4use crate::core::{Processor, RetryConfig, smart_retry_async};
5use crate::models::{BookFolder, ProcessingResult};
6use anyhow::Result;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::sync::{mpsc, Semaphore};
10
11pub struct BatchProcessor {
13 workers: usize,
15 keep_temp: bool,
17 encoder: AacEncoder,
19 enable_parallel_encoding: bool,
21 max_concurrent_encodes: usize,
23 max_concurrent_files: usize,
25 quality_preset: Option<String>,
27 retry_config: RetryConfig,
29}
30
31impl BatchProcessor {
32 pub fn new(workers: usize) -> Self {
34 Self {
35 workers: workers.clamp(1, 16),
36 keep_temp: false,
37 encoder: crate::audio::get_encoder(),
38 enable_parallel_encoding: true,
39 max_concurrent_encodes: 2, max_concurrent_files: 8, quality_preset: None,
42 retry_config: RetryConfig::new(),
43 }
44 }
45
46 pub fn with_options(
48 workers: usize,
49 keep_temp: bool,
50 encoder: AacEncoder,
51 enable_parallel_encoding: bool,
52 max_concurrent_encodes: usize,
53 max_concurrent_files: usize,
54 quality_preset: Option<String>,
55 retry_config: RetryConfig,
56 ) -> Self {
57 Self {
58 workers: workers.clamp(1, 16),
59 keep_temp,
60 encoder,
61 enable_parallel_encoding,
62 max_concurrent_encodes: max_concurrent_encodes.clamp(1, 16),
63 max_concurrent_files: max_concurrent_files.clamp(1, 32),
64 quality_preset,
65 retry_config,
66 }
67 }
68
69 pub async fn process_batch(
71 &self,
72 books: Vec<BookFolder>,
73 output_dir: &Path,
74 chapter_source: &str,
75 ) -> Vec<ProcessingResult> {
76 let total_books = books.len();
77
78 if total_books == 0 {
79 return Vec::new();
80 }
81
82 tracing::info!(
83 "Starting batch processing: {} books with {} workers (max {} concurrent encodes)",
84 total_books,
85 self.workers,
86 self.max_concurrent_encodes
87 );
88
89 let encode_semaphore = Arc::new(Semaphore::new(self.max_concurrent_encodes));
91
92 let (result_tx, mut result_rx) = mpsc::channel(total_books);
94
95 let mut handles = Vec::new();
97
98 for (index, book) in books.into_iter().enumerate() {
99 let result_tx = result_tx.clone();
100 let output_dir = output_dir.to_path_buf();
101 let chapter_source = chapter_source.to_string();
102 let keep_temp = self.keep_temp;
103 let encoder = self.encoder;
104 let enable_parallel_encoding = self.enable_parallel_encoding;
105 let max_concurrent_files = self.max_concurrent_files;
106 let quality_preset = self.quality_preset.clone();
107 let encode_semaphore = Arc::clone(&encode_semaphore);
108 let retry_config = self.retry_config.clone();
109
110 let handle = tokio::spawn(async move {
111 let _permit = encode_semaphore.acquire().await.unwrap();
113
114 tracing::info!(
115 "[{}/{}] Processing: {}",
116 index + 1,
117 total_books,
118 book.name
119 );
120
121 let result = smart_retry_async(&retry_config, || {
123 Self::process_single_book(
124 &book,
125 &output_dir,
126 &chapter_source,
127 keep_temp,
128 encoder,
129 enable_parallel_encoding,
130 max_concurrent_files,
131 quality_preset.clone(),
132 )
133 })
134 .await
135 .unwrap_or_else(|e| {
136 tracing::error!("✗ {}: {:?}", book.name, e);
138 ProcessingResult::new(book.name.clone())
139 .failure(format!("All retries failed: {:?}", e), 0.0)
140 });
141
142 let _ = result_tx.send(result).await;
144 });
145
146 handles.push(handle);
147 }
148
149 drop(result_tx);
151
152 let mut results = Vec::new();
154 while let Some(result) = result_rx.recv().await {
155 results.push(result);
156 }
157
158 for handle in handles {
160 let _ = handle.await;
161 }
162
163 tracing::info!(
164 "Batch processing complete: {}/{} successful",
165 results.iter().filter(|r| r.success).count(),
166 results.len()
167 );
168
169 results
170 }
171
172 async fn process_single_book(
174 book: &BookFolder,
175 output_dir: &Path,
176 chapter_source: &str,
177 keep_temp: bool,
178 encoder: AacEncoder,
179 enable_parallel_encoding: bool,
180 max_concurrent_files: usize,
181 quality_preset: Option<String>,
182 ) -> Result<ProcessingResult> {
183 let processor = Processor::with_options(
184 keep_temp,
185 encoder,
186 enable_parallel_encoding,
187 max_concurrent_files,
188 quality_preset,
189 )?;
190
191 let result = processor
192 .process_book(book, output_dir, chapter_source)
193 .await?;
194
195 tracing::info!("✓ {}: {:.1}s", book.name, result.processing_time);
196 Ok(result)
197 }
198
199 pub fn recommended_workers() -> usize {
201 let cpu_count = num_cpus::get();
202
203 (cpu_count / 2).max(1).min(8)
206 }
207}
208
209impl Default for BatchProcessor {
210 fn default() -> Self {
211 Self::new(Self::recommended_workers())
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 #[test]
220 fn test_batch_processor_creation() {
221 let processor = BatchProcessor::new(4);
222 assert_eq!(processor.workers, 4);
223 assert_eq!(processor.max_concurrent_encodes, 2);
224 assert!(!processor.keep_temp);
225 assert!(matches!(processor.encoder, AacEncoder::AppleSilicon | AacEncoder::LibFdk | AacEncoder::Native));
227 }
228
229 #[test]
230 fn test_batch_processor_with_options() {
231 let processor = BatchProcessor::with_options(8, true, AacEncoder::AppleSilicon, true, 4, 8, None, RetryConfig::new());
232 assert_eq!(processor.workers, 8);
233 assert_eq!(processor.max_concurrent_encodes, 4);
234 assert_eq!(processor.max_concurrent_files, 8);
235 assert!(processor.keep_temp);
236 assert_eq!(processor.encoder, AacEncoder::AppleSilicon);
237 }
238
239 #[test]
240 fn test_worker_clamping() {
241 let processor = BatchProcessor::new(0);
243 assert_eq!(processor.workers, 1);
244
245 let processor = BatchProcessor::new(100);
247 assert_eq!(processor.workers, 16);
248 }
249
250 #[test]
251 fn test_concurrent_encode_clamping() {
252 let processor = BatchProcessor::with_options(4, false, AacEncoder::Native, true, 0, 8, None, RetryConfig::new());
253 assert_eq!(processor.max_concurrent_encodes, 1);
254
255 let processor = BatchProcessor::with_options(4, false, AacEncoder::Native, true, 100, 8, None, RetryConfig::new());
256 assert_eq!(processor.max_concurrent_encodes, 16);
257 }
258
259 #[test]
260 fn test_recommended_workers() {
261 let workers = BatchProcessor::recommended_workers();
262 assert!(workers >= 1);
263 assert!(workers <= 8);
264 }
265
266 #[tokio::test]
267 async fn test_empty_batch() {
268 let processor = BatchProcessor::new(4);
269 let results = processor
270 .process_batch(Vec::new(), Path::new("/tmp"), "auto")
271 .await;
272 assert_eq!(results.len(), 0);
273 }
274}