use super::config::BatchConfig;
use super::results::{BatchResults, FileResult};
use super::traits::{BatchOperation, StreamingBatchOperation};
use crate::error::CliError;
use colored::Colorize;
use rayon::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug)]
struct ProgressTracker {
total: usize,
processed: AtomicUsize,
succeeded: AtomicUsize,
failed: AtomicUsize,
interval: usize,
verbose: bool,
start_time: Instant,
}
impl ProgressTracker {
fn new(total: usize, interval: usize, verbose: bool) -> Self {
Self {
total,
processed: AtomicUsize::new(0),
succeeded: AtomicUsize::new(0),
failed: AtomicUsize::new(0),
interval,
verbose,
start_time: Instant::now(),
}
}
fn record_success(&self, path: &Path) {
let processed = self.processed.fetch_add(1, Ordering::Relaxed) + 1;
self.succeeded.fetch_add(1, Ordering::Relaxed);
if self.should_report(processed) {
self.report_progress(path, true);
}
}
fn record_failure(&self, path: &Path, error: &CliError) {
let processed = self.processed.fetch_add(1, Ordering::Relaxed) + 1;
self.failed.fetch_add(1, Ordering::Relaxed);
if self.verbose {
eprintln!("{} {} - {}", "✗".red().bold(), path.display(), error);
}
if self.should_report(processed) {
self.report_progress(path, false);
}
}
fn should_report(&self, processed: usize) -> bool {
self.interval > 0 && (processed % self.interval == 0 || processed == self.total)
}
fn report_progress(&self, current_file: &Path, success: bool) {
let processed = self.processed.load(Ordering::Relaxed);
let succeeded = self.succeeded.load(Ordering::Relaxed);
let failed = self.failed.load(Ordering::Relaxed);
let elapsed = self.start_time.elapsed();
let rate = processed as f64 / elapsed.as_secs_f64();
if self.verbose {
let status = if success {
"✓".green().bold()
} else {
"✗".red().bold()
};
eprintln!(
"{} [{}/{}] {} ({:.1} files/s)",
status,
processed,
self.total,
current_file.display(),
rate
);
} else {
eprintln!(
"Progress: [{}/{}] {} succeeded, {} failed ({:.1} files/s)",
processed, self.total, succeeded, failed, rate
);
}
}
fn print_summary(&self, operation_name: &str) {
let processed = self.processed.load(Ordering::Relaxed);
let succeeded = self.succeeded.load(Ordering::Relaxed);
let failed = self.failed.load(Ordering::Relaxed);
let elapsed = self.start_time.elapsed();
println!();
println!("{}", "═".repeat(60).bright_blue());
println!(
"{} {}",
"Batch Operation:".bright_blue().bold(),
operation_name.bright_white()
);
println!("{}", "═".repeat(60).bright_blue());
println!(
" {} {}",
"Total files:".bright_cyan(),
processed.to_string().bright_white()
);
println!(
" {} {}",
"Succeeded:".green().bold(),
succeeded.to_string().bright_white()
);
println!(
" {} {}",
"Failed:".red().bold(),
failed.to_string().bright_white()
);
println!(
" {} {:.2}s",
"Elapsed:".bright_cyan(),
elapsed.as_secs_f64()
);
println!(
" {} {:.1} files/s",
"Throughput:".bright_cyan(),
processed as f64 / elapsed.as_secs_f64()
);
println!("{}", "═".repeat(60).bright_blue());
}
}
#[derive(Debug, Clone)]
pub struct BatchExecutor {
config: BatchConfig,
}
impl BatchExecutor {
#[must_use]
pub fn new(config: BatchConfig) -> Self {
Self { config }
}
#[must_use]
pub fn default_config() -> Self {
Self::new(BatchConfig::default())
}
pub fn process<O>(
&self,
files: &[PathBuf],
operation: O,
show_progress: bool,
) -> Result<BatchResults<O::Output>, CliError>
where
O: BatchOperation,
{
let start_time = Instant::now();
if files.is_empty() {
if show_progress {
eprintln!(
"{} No files matched the provided patterns",
"Warning:".yellow().bold()
);
eprintln!(
"{} Check that patterns are correct and files exist",
"Hint:".cyan()
);
}
return Ok(BatchResults::new(vec![], 0));
}
let results = if files.len() < self.config.parallel_threshold {
self.process_serial(files, &operation, show_progress)
} else if let Some(max_threads) = self.config.max_threads {
self.process_with_local_pool(files, &operation, show_progress, max_threads)?
} else {
self.process_parallel(files, &operation, show_progress)
};
let elapsed_ms = start_time.elapsed().as_millis();
Ok(BatchResults::new(results, elapsed_ms))
}
fn process_with_local_pool<O>(
&self,
files: &[PathBuf],
operation: &O,
show_progress: bool,
num_threads: usize,
) -> Result<Vec<FileResult<O::Output>>, CliError>
where
O: BatchOperation,
{
if num_threads == 0 {
return Err(CliError::thread_pool_error(
"Cannot create thread pool with 0 threads".to_string(),
num_threads,
));
}
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.map_err(|e| {
CliError::thread_pool_error(
format!("Failed to create thread pool with {num_threads} threads: {e}"),
num_threads,
)
})?;
let results = pool.install(|| self.process_parallel(files, operation, show_progress));
Ok(results)
}
fn process_serial<O>(
&self,
files: &[PathBuf],
operation: &O,
show_progress: bool,
) -> Vec<FileResult<O::Output>>
where
O: BatchOperation,
{
let tracker = if show_progress {
Some(ProgressTracker::new(
files.len(),
self.config.progress_interval,
self.config.verbose,
))
} else {
None
};
let results: Vec<FileResult<O::Output>> = files
.iter()
.map(|path| {
let result = operation.process_file(path);
if let Some(ref t) = tracker {
match &result {
Ok(_) => t.record_success(path),
Err(e) => t.record_failure(path, e),
}
}
FileResult {
path: path.clone(),
result: result.map_err(|e| e.clone()),
}
})
.collect();
if show_progress {
if let Some(tracker) = tracker {
tracker.print_summary(operation.name());
}
}
results
}
fn process_parallel<O>(
&self,
files: &[PathBuf],
operation: &O,
show_progress: bool,
) -> Vec<FileResult<O::Output>>
where
O: BatchOperation,
{
let tracker = if show_progress {
Some(Arc::new(ProgressTracker::new(
files.len(),
self.config.progress_interval,
self.config.verbose,
)))
} else {
None
};
let results: Vec<FileResult<O::Output>> = files
.par_iter()
.map(|path| {
let result = operation.process_file(path);
if let Some(ref t) = tracker {
match &result {
Ok(_) => t.record_success(path),
Err(e) => t.record_failure(path, e),
}
}
FileResult {
path: path.clone(),
result: result.map_err(|e| e.clone()),
}
})
.collect();
if show_progress {
if let Some(tracker) = tracker {
tracker.print_summary(operation.name());
}
}
results
}
pub fn process_streaming<O>(
&self,
files: &[PathBuf],
operation: O,
show_progress: bool,
) -> Result<BatchResults<O::Output>, CliError>
where
O: StreamingBatchOperation,
{
let start_time = Instant::now();
if files.is_empty() {
return Ok(BatchResults::new(vec![], 0));
}
if let Some(max_threads) = self.config.max_threads {
rayon::ThreadPoolBuilder::new()
.num_threads(max_threads)
.build_global()
.ok(); }
let results = if files.len() < self.config.parallel_threshold {
self.process_streaming_serial(files, &operation, show_progress)
} else {
self.process_streaming_parallel(files, &operation, show_progress)
};
let elapsed_ms = start_time.elapsed().as_millis();
Ok(BatchResults::new(results, elapsed_ms))
}
fn process_streaming_serial<O>(
&self,
files: &[PathBuf],
operation: &O,
show_progress: bool,
) -> Vec<FileResult<O::Output>>
where
O: StreamingBatchOperation,
{
let tracker = if show_progress {
Some(ProgressTracker::new(
files.len(),
self.config.progress_interval,
self.config.verbose,
))
} else {
None
};
let results: Vec<FileResult<O::Output>> = files
.iter()
.map(|path| {
let result = operation.process_file_streaming(path);
if let Some(ref t) = tracker {
match &result {
Ok(_) => t.record_success(path),
Err(e) => t.record_failure(path, e),
}
}
FileResult {
path: path.clone(),
result: result.map_err(|e| e.clone()),
}
})
.collect();
if show_progress {
if let Some(tracker) = tracker {
tracker.print_summary(operation.name());
}
}
results
}
fn process_streaming_parallel<O>(
&self,
files: &[PathBuf],
operation: &O,
show_progress: bool,
) -> Vec<FileResult<O::Output>>
where
O: StreamingBatchOperation,
{
let tracker = if show_progress {
Some(Arc::new(ProgressTracker::new(
files.len(),
self.config.progress_interval,
self.config.verbose,
)))
} else {
None
};
let results: Vec<FileResult<O::Output>> = files
.par_iter()
.map(|path| {
let result = operation.process_file_streaming(path);
if let Some(ref t) = tracker {
match &result {
Ok(_) => t.record_success(path),
Err(e) => t.record_failure(path, e),
}
}
FileResult {
path: path.clone(),
result: result.map_err(|e| e.clone()),
}
})
.collect();
if show_progress {
if let Some(tracker) = tracker {
tracker.print_summary(operation.name());
}
}
results
}
pub fn process_auto<O, SO>(
&self,
files: &[PathBuf],
standard_op: O,
streaming_op: SO,
show_progress: bool,
) -> Result<BatchResults<O::Output>, CliError>
where
O: BatchOperation<Output = SO::Output>,
SO: StreamingBatchOperation,
{
const STREAMING_THRESHOLD: u64 = 100 * 1024 * 1024;
if files.is_empty() {
return Ok(BatchResults::new(vec![], 0));
}
let start_time = Instant::now();
let mut small_files = Vec::new();
let mut large_files = Vec::new();
for path in files {
match std::fs::metadata(path) {
Ok(meta) if meta.len() > STREAMING_THRESHOLD => {
large_files.push(path.clone());
}
Ok(_) => {
small_files.push(path.clone());
}
Err(_) => {
small_files.push(path.clone());
}
}
}
let mut all_results = if small_files.is_empty() {
Vec::new()
} else {
self.process(&small_files, standard_op, show_progress)?
.results
};
if !large_files.is_empty() {
let streaming_results = self
.process_streaming(&large_files, streaming_op, show_progress)?
.results;
all_results.extend(streaming_results);
}
let file_order: Vec<&PathBuf> = files.iter().collect();
all_results.sort_by_key(|r| {
file_order
.iter()
.position(|&p| p == &r.path)
.unwrap_or(usize::MAX)
});
let elapsed_ms = start_time.elapsed().as_millis();
Ok(BatchResults::new(all_results, elapsed_ms))
}
}