hedl_cli/batch/executor.rs
1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Batch executor implementation.
19
20use super::config::BatchConfig;
21use super::results::{BatchResults, FileResult};
22use super::traits::{BatchOperation, StreamingBatchOperation};
23use crate::error::CliError;
24use colored::Colorize;
25use rayon::prelude::*;
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicUsize, Ordering};
28use std::sync::Arc;
29use std::time::Instant;
30
31/// Progress tracker for batch operations.
32///
33/// Uses atomic counters for lock-free concurrent progress tracking.
34#[derive(Debug)]
35struct ProgressTracker {
36 total: usize,
37 processed: AtomicUsize,
38 succeeded: AtomicUsize,
39 failed: AtomicUsize,
40 interval: usize,
41 verbose: bool,
42 start_time: Instant,
43}
44
45impl ProgressTracker {
46 /// Create a new progress tracker.
47 fn new(total: usize, interval: usize, verbose: bool) -> Self {
48 Self {
49 total,
50 processed: AtomicUsize::new(0),
51 succeeded: AtomicUsize::new(0),
52 failed: AtomicUsize::new(0),
53 interval,
54 verbose,
55 start_time: Instant::now(),
56 }
57 }
58
59 /// Record a successful file processing.
60 fn record_success(&self, path: &Path) {
61 let processed = self.processed.fetch_add(1, Ordering::Relaxed) + 1;
62 self.succeeded.fetch_add(1, Ordering::Relaxed);
63
64 if self.should_report(processed) {
65 self.report_progress(path, true);
66 }
67 }
68
69 /// Record a failed file processing.
70 fn record_failure(&self, path: &Path, error: &CliError) {
71 let processed = self.processed.fetch_add(1, Ordering::Relaxed) + 1;
72 self.failed.fetch_add(1, Ordering::Relaxed);
73
74 if self.verbose {
75 eprintln!("{} {} - {}", "✗".red().bold(), path.display(), error);
76 }
77
78 if self.should_report(processed) {
79 self.report_progress(path, false);
80 }
81 }
82
83 /// Check if progress should be reported for this count.
84 fn should_report(&self, processed: usize) -> bool {
85 self.interval > 0 && (processed % self.interval == 0 || processed == self.total)
86 }
87
88 /// Report current progress to stderr.
89 fn report_progress(&self, current_file: &Path, success: bool) {
90 let processed = self.processed.load(Ordering::Relaxed);
91 let succeeded = self.succeeded.load(Ordering::Relaxed);
92 let failed = self.failed.load(Ordering::Relaxed);
93 let elapsed = self.start_time.elapsed();
94 let rate = processed as f64 / elapsed.as_secs_f64();
95
96 if self.verbose {
97 let status = if success {
98 "✓".green().bold()
99 } else {
100 "✗".red().bold()
101 };
102 eprintln!(
103 "{} [{}/{}] {} ({:.1} files/s)",
104 status,
105 processed,
106 self.total,
107 current_file.display(),
108 rate
109 );
110 } else {
111 eprintln!(
112 "Progress: [{}/{}] {} succeeded, {} failed ({:.1} files/s)",
113 processed, self.total, succeeded, failed, rate
114 );
115 }
116 }
117
118 /// Print final summary.
119 fn print_summary(&self, operation_name: &str) {
120 let processed = self.processed.load(Ordering::Relaxed);
121 let succeeded = self.succeeded.load(Ordering::Relaxed);
122 let failed = self.failed.load(Ordering::Relaxed);
123 let elapsed = self.start_time.elapsed();
124
125 println!();
126 println!("{}", "═".repeat(60).bright_blue());
127 println!(
128 "{} {}",
129 "Batch Operation:".bright_blue().bold(),
130 operation_name.bright_white()
131 );
132 println!("{}", "═".repeat(60).bright_blue());
133 println!(
134 " {} {}",
135 "Total files:".bright_cyan(),
136 processed.to_string().bright_white()
137 );
138 println!(
139 " {} {}",
140 "Succeeded:".green().bold(),
141 succeeded.to_string().bright_white()
142 );
143 println!(
144 " {} {}",
145 "Failed:".red().bold(),
146 failed.to_string().bright_white()
147 );
148 println!(
149 " {} {:.2}s",
150 "Elapsed:".bright_cyan(),
151 elapsed.as_secs_f64()
152 );
153 println!(
154 " {} {:.1} files/s",
155 "Throughput:".bright_cyan(),
156 processed as f64 / elapsed.as_secs_f64()
157 );
158 println!("{}", "═".repeat(60).bright_blue());
159 }
160}
161
162/// High-performance batch processor for HEDL files.
163///
164/// Orchestrates parallel or serial processing based on configuration and workload.
165/// Provides progress tracking and comprehensive error collection.
166///
167/// # Thread Safety
168///
169/// `BatchExecutor` is thread-safe and can be shared across threads via Arc.
170///
171/// # Examples
172///
173/// ```rust,no_run
174/// use hedl_cli::batch::{BatchExecutor, BatchConfig, ValidationOperation};
175/// use std::path::PathBuf;
176///
177/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
178/// let processor = BatchExecutor::new(BatchConfig {
179/// parallel_threshold: 5,
180/// verbose: true,
181/// ..Default::default()
182/// });
183///
184/// let files: Vec<PathBuf> = vec![
185/// "file1.hedl".into(),
186/// "file2.hedl".into(),
187/// ];
188///
189/// let results = processor.process(
190/// &files,
191/// ValidationOperation { strict: false },
192/// true,
193/// )?;
194///
195/// if results.has_failures() {
196/// eprintln!("Some files failed validation");
197/// for failure in results.failures() {
198/// eprintln!(" - {}: {:?}", failure.path.display(), failure.result);
199/// }
200/// }
201/// # Ok(())
202/// # }
203/// ```
204#[derive(Debug, Clone)]
205pub struct BatchExecutor {
206 config: BatchConfig,
207}
208
209impl BatchExecutor {
210 /// Create a new batch processor with the given configuration.
211 #[must_use]
212 pub fn new(config: BatchConfig) -> Self {
213 Self { config }
214 }
215
216 /// Create a batch processor with default configuration.
217 #[must_use]
218 pub fn default_config() -> Self {
219 Self::new(BatchConfig::default())
220 }
221
222 /// Process multiple files with the given operation.
223 ///
224 /// Automatically selects parallel or serial processing based on configuration
225 /// and file count. Provides progress reporting and collects all results.
226 ///
227 /// # Arguments
228 ///
229 /// * `files` - Slice of file paths to process
230 /// * `operation` - The operation to perform on each file
231 /// * `show_progress` - Whether to show progress updates
232 ///
233 /// # Returns
234 ///
235 /// * `Ok(BatchResults)` - Successfully processed all files (individual failures collected in results)
236 /// * `Err(CliError::ThreadPoolError)` - Failed to create thread pool with requested configuration
237 ///
238 /// # Thread Pool Selection
239 ///
240 /// The method uses different thread pool strategies based on configuration:
241 ///
242 /// 1. **Serial Processing**: If `files.len() < parallel_threshold`, processes serially (no thread pool)
243 /// 2. **Local Thread Pool**: If `max_threads` is `Some(n)`, creates isolated pool with `n` threads
244 /// 3. **Global Thread Pool**: If `max_threads` is `None`, uses Rayon's global pool
245 ///
246 /// # Error Handling
247 ///
248 /// Thread pool creation can fail if:
249 /// - `max_threads` is 0 (invalid configuration)
250 /// - System cannot allocate thread resources
251 /// - Thread stack allocation fails
252 ///
253 /// Individual file processing errors are collected in `BatchResults`, not returned as errors.
254 ///
255 /// # Performance
256 ///
257 /// - Serial processing for small batches to avoid thread pool overhead
258 /// - Local thread pool: ~0.5-1ms creation overhead, ~2-8MB per thread
259 /// - Global thread pool: zero overhead
260 /// - Lock-free progress tracking using atomic counters
261 ///
262 /// # Examples
263 ///
264 /// ```rust,no_run
265 /// use hedl_cli::batch::{BatchExecutor, BatchConfig, FormatOperation};
266 /// use hedl_cli::error::CliError;
267 /// use std::path::PathBuf;
268 ///
269 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
270 /// let processor = BatchExecutor::new(BatchConfig {
271 /// max_threads: Some(4),
272 /// ..Default::default()
273 /// });
274 ///
275 /// let files = vec![PathBuf::from("a.hedl"), PathBuf::from("b.hedl")];
276 ///
277 /// match processor.process(
278 /// &files,
279 /// FormatOperation {
280 /// check: false,
281 /// ditto: true,
282 /// with_counts: false,
283 /// },
284 /// true,
285 /// ) {
286 /// Ok(results) => {
287 /// println!("Formatted {} files", results.success_count());
288 /// if results.has_failures() {
289 /// // Handle individual file failures
290 /// }
291 /// }
292 /// Err(CliError::ThreadPoolError { message, requested_threads }) => {
293 /// eprintln!("Failed to create thread pool: {}", message);
294 /// }
295 /// Err(e) => {
296 /// eprintln!("Unexpected error: {}", e);
297 /// }
298 /// }
299 /// # Ok(())
300 /// # }
301 /// ```
302 pub fn process<O>(
303 &self,
304 files: &[PathBuf],
305 operation: O,
306 show_progress: bool,
307 ) -> Result<BatchResults<O::Output>, CliError>
308 where
309 O: BatchOperation,
310 {
311 let start_time = Instant::now();
312
313 if files.is_empty() {
314 // Warn the user when no files match the provided patterns.
315 // Only show warning when progress is enabled, as this indicates
316 // the user expects feedback. Silent mode implies automated/scripted usage.
317 if show_progress {
318 eprintln!(
319 "{} No files matched the provided patterns",
320 "Warning:".yellow().bold()
321 );
322 eprintln!(
323 "{} Check that patterns are correct and files exist",
324 "Hint:".cyan()
325 );
326 }
327 return Ok(BatchResults::new(vec![], 0));
328 }
329
330 let results = if files.len() < self.config.parallel_threshold {
331 // Serial processing for small batches (no thread pool needed)
332 self.process_serial(files, &operation, show_progress)
333 } else if let Some(max_threads) = self.config.max_threads {
334 // Use local thread pool with specified thread count
335 self.process_with_local_pool(files, &operation, show_progress, max_threads)?
336 } else {
337 // Use default global thread pool (Rayon's default)
338 self.process_parallel(files, &operation, show_progress)
339 };
340
341 let elapsed_ms = start_time.elapsed().as_millis();
342
343 Ok(BatchResults::new(results, elapsed_ms))
344 }
345
346 /// Process files using a local thread pool with specified thread count.
347 ///
348 /// Creates an isolated Rayon thread pool that doesn't affect global state.
349 /// The thread pool is created for this operation and destroyed when complete.
350 ///
351 /// # Arguments
352 ///
353 /// * `files` - Slice of file paths to process
354 /// * `operation` - The operation to perform on each file
355 /// * `show_progress` - Whether to show progress updates
356 /// * `num_threads` - The number of threads to use
357 ///
358 /// # Returns
359 ///
360 /// * `Ok(Vec<FileResult>)` - Successfully processed files with local pool
361 /// * `Err(CliError::ThreadPoolError)` - Failed to create thread pool
362 ///
363 /// # Errors
364 ///
365 /// Returns `ThreadPoolError` if:
366 /// - `num_threads` is 0 (invalid configuration)
367 /// - System cannot allocate thread resources
368 /// - Thread stack allocation fails
369 ///
370 /// # Performance
371 ///
372 /// - Thread pool creation: ~0.5-1ms overhead
373 /// - Memory cost: ~2-8MB per thread (OS thread stacks)
374 /// - Pool lifetime: Duration of this method call
375 fn process_with_local_pool<O>(
376 &self,
377 files: &[PathBuf],
378 operation: &O,
379 show_progress: bool,
380 num_threads: usize,
381 ) -> Result<Vec<FileResult<O::Output>>, CliError>
382 where
383 O: BatchOperation,
384 {
385 // Validate thread count - 0 threads is invalid
386 if num_threads == 0 {
387 return Err(CliError::thread_pool_error(
388 "Cannot create thread pool with 0 threads".to_string(),
389 num_threads,
390 ));
391 }
392
393 // Build a local thread pool
394 let pool = rayon::ThreadPoolBuilder::new()
395 .num_threads(num_threads)
396 .build()
397 .map_err(|e| {
398 CliError::thread_pool_error(
399 format!("Failed to create thread pool with {num_threads} threads: {e}"),
400 num_threads,
401 )
402 })?;
403
404 // Run parallel processing within the local pool
405 let results = pool.install(|| self.process_parallel(files, operation, show_progress));
406
407 Ok(results)
408 }
409
410 /// Process files serially (single-threaded).
411 fn process_serial<O>(
412 &self,
413 files: &[PathBuf],
414 operation: &O,
415 show_progress: bool,
416 ) -> Vec<FileResult<O::Output>>
417 where
418 O: BatchOperation,
419 {
420 let tracker = if show_progress {
421 Some(ProgressTracker::new(
422 files.len(),
423 self.config.progress_interval,
424 self.config.verbose,
425 ))
426 } else {
427 None
428 };
429
430 let results: Vec<FileResult<O::Output>> = files
431 .iter()
432 .map(|path| {
433 let result = operation.process_file(path);
434
435 if let Some(ref t) = tracker {
436 match &result {
437 Ok(_) => t.record_success(path),
438 Err(e) => t.record_failure(path, e),
439 }
440 }
441
442 FileResult {
443 path: path.clone(),
444 result: result.map_err(|e| e.clone()),
445 }
446 })
447 .collect();
448
449 if show_progress {
450 if let Some(tracker) = tracker {
451 tracker.print_summary(operation.name());
452 }
453 }
454
455 results
456 }
457
458 /// Process files in parallel using Rayon.
459 fn process_parallel<O>(
460 &self,
461 files: &[PathBuf],
462 operation: &O,
463 show_progress: bool,
464 ) -> Vec<FileResult<O::Output>>
465 where
466 O: BatchOperation,
467 {
468 let tracker = if show_progress {
469 Some(Arc::new(ProgressTracker::new(
470 files.len(),
471 self.config.progress_interval,
472 self.config.verbose,
473 )))
474 } else {
475 None
476 };
477
478 let results: Vec<FileResult<O::Output>> = files
479 .par_iter()
480 .map(|path| {
481 let result = operation.process_file(path);
482
483 if let Some(ref t) = tracker {
484 match &result {
485 Ok(_) => t.record_success(path),
486 Err(e) => t.record_failure(path, e),
487 }
488 }
489
490 FileResult {
491 path: path.clone(),
492 result: result.map_err(|e| e.clone()),
493 }
494 })
495 .collect();
496
497 if show_progress {
498 if let Some(tracker) = tracker {
499 tracker.print_summary(operation.name());
500 }
501 }
502
503 results
504 }
505
506 /// Process files using streaming operations for memory efficiency.
507 ///
508 /// This method uses the streaming parser from `hedl-stream` to process files
509 /// with constant memory usage regardless of file size. Ideal for:
510 /// - Files larger than 100MB
511 /// - Memory-constrained environments
512 /// - Processing thousands of files
513 ///
514 /// # Arguments
515 ///
516 /// * `files` - Slice of file paths to process
517 /// * `operation` - The streaming operation to perform
518 /// * `show_progress` - Whether to show progress updates
519 ///
520 /// # Returns
521 ///
522 /// * `Ok(BatchResults)` - Always succeeds and collects all individual results
523 /// * `Err(CliError)` - Only on catastrophic failures
524 ///
525 /// # Memory Usage
526 ///
527 /// Peak memory = `buffer_size` (8KB) × `num_threads` + ID tracking set
528 ///
529 /// # Examples
530 ///
531 /// ```rust,no_run
532 /// use hedl_cli::batch::{BatchExecutor, StreamingValidationOperation, BatchConfig};
533 /// use std::path::PathBuf;
534 ///
535 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
536 /// let processor = BatchExecutor::default_config();
537 /// let files = vec![PathBuf::from("large-file.hedl")];
538 /// let operation = StreamingValidationOperation { strict: false };
539 ///
540 /// let results = processor.process_streaming(&files, operation, true)?;
541 /// println!("Processed {} files with constant memory", results.success_count());
542 /// # Ok(())
543 /// # }
544 /// ```
545 pub fn process_streaming<O>(
546 &self,
547 files: &[PathBuf],
548 operation: O,
549 show_progress: bool,
550 ) -> Result<BatchResults<O::Output>, CliError>
551 where
552 O: StreamingBatchOperation,
553 {
554 let start_time = Instant::now();
555
556 if files.is_empty() {
557 return Ok(BatchResults::new(vec![], 0));
558 }
559
560 // Configure thread pool if max_threads is specified
561 if let Some(max_threads) = self.config.max_threads {
562 rayon::ThreadPoolBuilder::new()
563 .num_threads(max_threads)
564 .build_global()
565 .ok(); // Ignore error if already initialized
566 }
567
568 let results = if files.len() < self.config.parallel_threshold {
569 self.process_streaming_serial(files, &operation, show_progress)
570 } else {
571 self.process_streaming_parallel(files, &operation, show_progress)
572 };
573
574 let elapsed_ms = start_time.elapsed().as_millis();
575 Ok(BatchResults::new(results, elapsed_ms))
576 }
577
578 /// Process files serially using streaming.
579 fn process_streaming_serial<O>(
580 &self,
581 files: &[PathBuf],
582 operation: &O,
583 show_progress: bool,
584 ) -> Vec<FileResult<O::Output>>
585 where
586 O: StreamingBatchOperation,
587 {
588 let tracker = if show_progress {
589 Some(ProgressTracker::new(
590 files.len(),
591 self.config.progress_interval,
592 self.config.verbose,
593 ))
594 } else {
595 None
596 };
597
598 let results: Vec<FileResult<O::Output>> = files
599 .iter()
600 .map(|path| {
601 let result = operation.process_file_streaming(path);
602
603 if let Some(ref t) = tracker {
604 match &result {
605 Ok(_) => t.record_success(path),
606 Err(e) => t.record_failure(path, e),
607 }
608 }
609
610 FileResult {
611 path: path.clone(),
612 result: result.map_err(|e| e.clone()),
613 }
614 })
615 .collect();
616
617 if show_progress {
618 if let Some(tracker) = tracker {
619 tracker.print_summary(operation.name());
620 }
621 }
622
623 results
624 }
625
626 /// Process files in parallel using streaming.
627 fn process_streaming_parallel<O>(
628 &self,
629 files: &[PathBuf],
630 operation: &O,
631 show_progress: bool,
632 ) -> Vec<FileResult<O::Output>>
633 where
634 O: StreamingBatchOperation,
635 {
636 let tracker = if show_progress {
637 Some(Arc::new(ProgressTracker::new(
638 files.len(),
639 self.config.progress_interval,
640 self.config.verbose,
641 )))
642 } else {
643 None
644 };
645
646 let results: Vec<FileResult<O::Output>> = files
647 .par_iter()
648 .map(|path| {
649 let result = operation.process_file_streaming(path);
650
651 if let Some(ref t) = tracker {
652 match &result {
653 Ok(_) => t.record_success(path),
654 Err(e) => t.record_failure(path, e),
655 }
656 }
657
658 FileResult {
659 path: path.clone(),
660 result: result.map_err(|e| e.clone()),
661 }
662 })
663 .collect();
664
665 if show_progress {
666 if let Some(tracker) = tracker {
667 tracker.print_summary(operation.name());
668 }
669 }
670
671 results
672 }
673
674 /// Automatically choose between standard and streaming based on file size.
675 ///
676 /// Files larger than 100MB use streaming mode for memory efficiency,
677 /// while smaller files use standard mode for better performance.
678 ///
679 /// # Arguments
680 ///
681 /// * `files` - Slice of file paths to process
682 /// * `standard_op` - Standard operation for small files
683 /// * `streaming_op` - Streaming operation for large files
684 /// * `show_progress` - Whether to show progress updates
685 ///
686 /// # Returns
687 ///
688 /// * `Ok(BatchResults)` - Combined results from both modes
689 /// * `Err(CliError)` - On catastrophic failures
690 ///
691 /// # Examples
692 ///
693 /// ```rust,no_run
694 /// use hedl_cli::batch::{BatchExecutor, ValidationOperation, StreamingValidationOperation};
695 /// use std::path::PathBuf;
696 ///
697 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
698 /// let processor = BatchExecutor::default_config();
699 /// let files = vec![
700 /// PathBuf::from("small.hedl"),
701 /// PathBuf::from("large-200mb.hedl"),
702 /// ];
703 ///
704 /// let results = processor.process_auto(
705 /// &files,
706 /// ValidationOperation { strict: false },
707 /// StreamingValidationOperation { strict: false },
708 /// true,
709 /// )?;
710 /// println!("Processed {} files", results.results.len());
711 /// # Ok(())
712 /// # }
713 /// ```
714 pub fn process_auto<O, SO>(
715 &self,
716 files: &[PathBuf],
717 standard_op: O,
718 streaming_op: SO,
719 show_progress: bool,
720 ) -> Result<BatchResults<O::Output>, CliError>
721 where
722 O: BatchOperation<Output = SO::Output>,
723 SO: StreamingBatchOperation,
724 {
725 const STREAMING_THRESHOLD: u64 = 100 * 1024 * 1024; // 100MB
726
727 if files.is_empty() {
728 return Ok(BatchResults::new(vec![], 0));
729 }
730
731 let start_time = Instant::now();
732
733 // Partition files by size
734 let mut small_files = Vec::new();
735 let mut large_files = Vec::new();
736
737 for path in files {
738 match std::fs::metadata(path) {
739 Ok(meta) if meta.len() > STREAMING_THRESHOLD => {
740 large_files.push(path.clone());
741 }
742 Ok(_) => {
743 small_files.push(path.clone());
744 }
745 Err(_) => {
746 // If we can't get size, treat as small
747 small_files.push(path.clone());
748 }
749 }
750 }
751
752 // Process small files with standard ops
753 let mut all_results = if small_files.is_empty() {
754 Vec::new()
755 } else {
756 self.process(&small_files, standard_op, show_progress)?
757 .results
758 };
759
760 // Process large files with streaming ops
761 if !large_files.is_empty() {
762 let streaming_results = self
763 .process_streaming(&large_files, streaming_op, show_progress)?
764 .results;
765 all_results.extend(streaming_results);
766 }
767
768 // Restore original order
769 let file_order: Vec<&PathBuf> = files.iter().collect();
770 all_results.sort_by_key(|r| {
771 file_order
772 .iter()
773 .position(|&p| p == &r.path)
774 .unwrap_or(usize::MAX)
775 });
776
777 let elapsed_ms = start_time.elapsed().as_millis();
778 Ok(BatchResults::new(all_results, elapsed_ms))
779 }
780}