use crate::audio::AacEncoder;
use crate::core::{Processor, RetryConfig, smart_retry_async};
use crate::models::{BookFolder, ProcessingResult};
use anyhow::Result;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::{mpsc, Semaphore};
pub struct BatchProcessor {
workers: usize,
keep_temp: bool,
encoder: AacEncoder,
enable_parallel_encoding: bool,
max_concurrent_encodes: usize,
max_concurrent_files: usize,
quality_preset: Option<String>,
retry_config: RetryConfig,
}
impl BatchProcessor {
pub fn new(workers: usize) -> Self {
Self {
workers: workers.clamp(1, 16),
keep_temp: false,
encoder: crate::audio::get_encoder(),
enable_parallel_encoding: true,
max_concurrent_encodes: 2, max_concurrent_files: 8, quality_preset: None,
retry_config: RetryConfig::new(),
}
}
pub fn with_options(
workers: usize,
keep_temp: bool,
encoder: AacEncoder,
enable_parallel_encoding: bool,
max_concurrent_encodes: usize,
max_concurrent_files: usize,
quality_preset: Option<String>,
retry_config: RetryConfig,
) -> Self {
Self {
workers: workers.clamp(1, 16),
keep_temp,
encoder,
enable_parallel_encoding,
max_concurrent_encodes: max_concurrent_encodes.clamp(1, 16),
max_concurrent_files: max_concurrent_files.clamp(1, 32),
quality_preset,
retry_config,
}
}
pub async fn process_batch(
&self,
books: Vec<BookFolder>,
output_dir: &Path,
chapter_source: &str,
) -> Vec<ProcessingResult> {
let total_books = books.len();
if total_books == 0 {
return Vec::new();
}
tracing::info!(
"Starting batch processing: {} books with {} workers (max {} concurrent encodes)",
total_books,
self.workers,
self.max_concurrent_encodes
);
let encode_semaphore = Arc::new(Semaphore::new(self.max_concurrent_encodes));
let (result_tx, mut result_rx) = mpsc::channel(total_books);
let mut handles = Vec::new();
for (index, book) in books.into_iter().enumerate() {
let result_tx = result_tx.clone();
let output_dir = output_dir.to_path_buf();
let chapter_source = chapter_source.to_string();
let keep_temp = self.keep_temp;
let encoder = self.encoder;
let enable_parallel_encoding = self.enable_parallel_encoding;
let max_concurrent_files = self.max_concurrent_files;
let quality_preset = self.quality_preset.clone();
let encode_semaphore = Arc::clone(&encode_semaphore);
let retry_config = self.retry_config.clone();
let handle = tokio::spawn(async move {
let _permit = encode_semaphore.acquire().await.unwrap();
tracing::info!(
"[{}/{}] Processing: {}",
index + 1,
total_books,
book.name
);
let result = smart_retry_async(&retry_config, || {
Self::process_single_book(
&book,
&output_dir,
&chapter_source,
keep_temp,
encoder,
enable_parallel_encoding,
max_concurrent_files,
quality_preset.clone(),
)
})
.await
.unwrap_or_else(|e| {
tracing::error!("✗ {}: {:?}", book.name, e);
ProcessingResult::new(book.name.clone())
.failure(format!("All retries failed: {:?}", e), 0.0)
});
let _ = result_tx.send(result).await;
});
handles.push(handle);
}
drop(result_tx);
let mut results = Vec::new();
while let Some(result) = result_rx.recv().await {
results.push(result);
}
for handle in handles {
let _ = handle.await;
}
tracing::info!(
"Batch processing complete: {}/{} successful",
results.iter().filter(|r| r.success).count(),
results.len()
);
results
}
async fn process_single_book(
book: &BookFolder,
output_dir: &Path,
chapter_source: &str,
keep_temp: bool,
encoder: AacEncoder,
enable_parallel_encoding: bool,
max_concurrent_files: usize,
quality_preset: Option<String>,
) -> Result<ProcessingResult> {
let processor = Processor::with_options(
keep_temp,
encoder,
enable_parallel_encoding,
max_concurrent_files,
quality_preset,
)?;
let result = processor
.process_book(book, output_dir, chapter_source)
.await?;
tracing::info!("✓ {}: {:.1}s", book.name, result.processing_time);
Ok(result)
}
pub fn recommended_workers() -> usize {
let cpu_count = num_cpus::get();
(cpu_count / 2).max(1).min(8)
}
}
impl Default for BatchProcessor {
fn default() -> Self {
Self::new(Self::recommended_workers())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_batch_processor_creation() {
let processor = BatchProcessor::new(4);
assert_eq!(processor.workers, 4);
assert_eq!(processor.max_concurrent_encodes, 2);
assert!(!processor.keep_temp);
assert!(matches!(processor.encoder, AacEncoder::AppleSilicon | AacEncoder::LibFdk | AacEncoder::Native));
}
#[test]
fn test_batch_processor_with_options() {
let processor = BatchProcessor::with_options(8, true, AacEncoder::AppleSilicon, true, 4, 8, None, RetryConfig::new());
assert_eq!(processor.workers, 8);
assert_eq!(processor.max_concurrent_encodes, 4);
assert_eq!(processor.max_concurrent_files, 8);
assert!(processor.keep_temp);
assert_eq!(processor.encoder, AacEncoder::AppleSilicon);
}
#[test]
fn test_worker_clamping() {
let processor = BatchProcessor::new(0);
assert_eq!(processor.workers, 1);
let processor = BatchProcessor::new(100);
assert_eq!(processor.workers, 16);
}
#[test]
fn test_concurrent_encode_clamping() {
let processor = BatchProcessor::with_options(4, false, AacEncoder::Native, true, 0, 8, None, RetryConfig::new());
assert_eq!(processor.max_concurrent_encodes, 1);
let processor = BatchProcessor::with_options(4, false, AacEncoder::Native, true, 100, 8, None, RetryConfig::new());
assert_eq!(processor.max_concurrent_encodes, 16);
}
#[test]
fn test_recommended_workers() {
let workers = BatchProcessor::recommended_workers();
assert!(workers >= 1);
assert!(workers <= 8);
}
#[tokio::test]
async fn test_empty_batch() {
let processor = BatchProcessor::new(4);
let results = processor
.process_batch(Vec::new(), Path::new("/tmp"), "auto")
.await;
assert_eq!(results.len(), 0);
}
}