Skip to main content

hedl_cli/batch/
executor.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Batch executor implementation.
19
20use super::config::BatchConfig;
21use super::results::{BatchResults, FileResult};
22use super::traits::{BatchOperation, StreamingBatchOperation};
23use crate::error::CliError;
24use colored::Colorize;
25use rayon::prelude::*;
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicUsize, Ordering};
28use std::sync::Arc;
29use std::time::Instant;
30
31/// Progress tracker for batch operations.
32///
33/// Uses atomic counters for lock-free concurrent progress tracking.
34#[derive(Debug)]
35struct ProgressTracker {
36    total: usize,
37    processed: AtomicUsize,
38    succeeded: AtomicUsize,
39    failed: AtomicUsize,
40    interval: usize,
41    verbose: bool,
42    start_time: Instant,
43}
44
45impl ProgressTracker {
46    /// Create a new progress tracker.
47    fn new(total: usize, interval: usize, verbose: bool) -> Self {
48        Self {
49            total,
50            processed: AtomicUsize::new(0),
51            succeeded: AtomicUsize::new(0),
52            failed: AtomicUsize::new(0),
53            interval,
54            verbose,
55            start_time: Instant::now(),
56        }
57    }
58
59    /// Record a successful file processing.
60    fn record_success(&self, path: &Path) {
61        let processed = self.processed.fetch_add(1, Ordering::Relaxed) + 1;
62        self.succeeded.fetch_add(1, Ordering::Relaxed);
63
64        if self.should_report(processed) {
65            self.report_progress(path, true);
66        }
67    }
68
69    /// Record a failed file processing.
70    fn record_failure(&self, path: &Path, error: &CliError) {
71        let processed = self.processed.fetch_add(1, Ordering::Relaxed) + 1;
72        self.failed.fetch_add(1, Ordering::Relaxed);
73
74        if self.verbose {
75            eprintln!("{} {} - {}", "✗".red().bold(), path.display(), error);
76        }
77
78        if self.should_report(processed) {
79            self.report_progress(path, false);
80        }
81    }
82
83    /// Check if progress should be reported for this count.
84    fn should_report(&self, processed: usize) -> bool {
85        self.interval > 0 && (processed % self.interval == 0 || processed == self.total)
86    }
87
88    /// Report current progress to stderr.
89    fn report_progress(&self, current_file: &Path, success: bool) {
90        let processed = self.processed.load(Ordering::Relaxed);
91        let succeeded = self.succeeded.load(Ordering::Relaxed);
92        let failed = self.failed.load(Ordering::Relaxed);
93        let elapsed = self.start_time.elapsed();
94        let rate = processed as f64 / elapsed.as_secs_f64();
95
96        if self.verbose {
97            let status = if success {
98                "✓".green().bold()
99            } else {
100                "✗".red().bold()
101            };
102            eprintln!(
103                "{} [{}/{}] {} ({:.1} files/s)",
104                status,
105                processed,
106                self.total,
107                current_file.display(),
108                rate
109            );
110        } else {
111            eprintln!(
112                "Progress: [{}/{}] {} succeeded, {} failed ({:.1} files/s)",
113                processed, self.total, succeeded, failed, rate
114            );
115        }
116    }
117
118    /// Print final summary.
119    fn print_summary(&self, operation_name: &str) {
120        let processed = self.processed.load(Ordering::Relaxed);
121        let succeeded = self.succeeded.load(Ordering::Relaxed);
122        let failed = self.failed.load(Ordering::Relaxed);
123        let elapsed = self.start_time.elapsed();
124
125        println!();
126        println!("{}", "═".repeat(60).bright_blue());
127        println!(
128            "{} {}",
129            "Batch Operation:".bright_blue().bold(),
130            operation_name.bright_white()
131        );
132        println!("{}", "═".repeat(60).bright_blue());
133        println!(
134            "  {} {}",
135            "Total files:".bright_cyan(),
136            processed.to_string().bright_white()
137        );
138        println!(
139            "  {} {}",
140            "Succeeded:".green().bold(),
141            succeeded.to_string().bright_white()
142        );
143        println!(
144            "  {} {}",
145            "Failed:".red().bold(),
146            failed.to_string().bright_white()
147        );
148        println!(
149            "  {} {:.2}s",
150            "Elapsed:".bright_cyan(),
151            elapsed.as_secs_f64()
152        );
153        println!(
154            "  {} {:.1} files/s",
155            "Throughput:".bright_cyan(),
156            processed as f64 / elapsed.as_secs_f64()
157        );
158        println!("{}", "═".repeat(60).bright_blue());
159    }
160}
161
162/// High-performance batch processor for HEDL files.
163///
164/// Orchestrates parallel or serial processing based on configuration and workload.
165/// Provides progress tracking and comprehensive error collection.
166///
167/// # Thread Safety
168///
169/// `BatchExecutor` is thread-safe and can be shared across threads via Arc.
170///
171/// # Examples
172///
173/// ```rust,no_run
174/// use hedl_cli::batch::{BatchExecutor, BatchConfig, ValidationOperation};
175/// use std::path::PathBuf;
176///
177/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
178/// let processor = BatchExecutor::new(BatchConfig {
179///     parallel_threshold: 5,
180///     verbose: true,
181///     ..Default::default()
182/// });
183///
184/// let files: Vec<PathBuf> = vec![
185///     "file1.hedl".into(),
186///     "file2.hedl".into(),
187/// ];
188///
189/// let results = processor.process(
190///     &files,
191///     ValidationOperation { strict: false },
192///     true,
193/// )?;
194///
195/// if results.has_failures() {
196///     eprintln!("Some files failed validation");
197///     for failure in results.failures() {
198///         eprintln!("  - {}: {:?}", failure.path.display(), failure.result);
199///     }
200/// }
201/// # Ok(())
202/// # }
203/// ```
204#[derive(Debug, Clone)]
205pub struct BatchExecutor {
206    config: BatchConfig,
207}
208
209impl BatchExecutor {
210    /// Create a new batch processor with the given configuration.
211    #[must_use]
212    pub fn new(config: BatchConfig) -> Self {
213        Self { config }
214    }
215
216    /// Create a batch processor with default configuration.
217    #[must_use]
218    pub fn default_config() -> Self {
219        Self::new(BatchConfig::default())
220    }
221
222    /// Process multiple files with the given operation.
223    ///
224    /// Automatically selects parallel or serial processing based on configuration
225    /// and file count. Provides progress reporting and collects all results.
226    ///
227    /// # Arguments
228    ///
229    /// * `files` - Slice of file paths to process
230    /// * `operation` - The operation to perform on each file
231    /// * `show_progress` - Whether to show progress updates
232    ///
233    /// # Returns
234    ///
235    /// * `Ok(BatchResults)` - Successfully processed all files (individual failures collected in results)
236    /// * `Err(CliError::ThreadPoolError)` - Failed to create thread pool with requested configuration
237    ///
238    /// # Thread Pool Selection
239    ///
240    /// The method uses different thread pool strategies based on configuration:
241    ///
242    /// 1. **Serial Processing**: If `files.len() < parallel_threshold`, processes serially (no thread pool)
243    /// 2. **Local Thread Pool**: If `max_threads` is `Some(n)`, creates isolated pool with `n` threads
244    /// 3. **Global Thread Pool**: If `max_threads` is `None`, uses Rayon's global pool
245    ///
246    /// # Error Handling
247    ///
248    /// Thread pool creation can fail if:
249    /// - `max_threads` is 0 (invalid configuration)
250    /// - System cannot allocate thread resources
251    /// - Thread stack allocation fails
252    ///
253    /// Individual file processing errors are collected in `BatchResults`, not returned as errors.
254    ///
255    /// # Performance
256    ///
257    /// - Serial processing for small batches to avoid thread pool overhead
258    /// - Local thread pool: ~0.5-1ms creation overhead, ~2-8MB per thread
259    /// - Global thread pool: zero overhead
260    /// - Lock-free progress tracking using atomic counters
261    ///
262    /// # Examples
263    ///
264    /// ```rust,no_run
265    /// use hedl_cli::batch::{BatchExecutor, BatchConfig, FormatOperation};
266    /// use hedl_cli::error::CliError;
267    /// use std::path::PathBuf;
268    ///
269    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
270    /// let processor = BatchExecutor::new(BatchConfig {
271    ///     max_threads: Some(4),
272    ///     ..Default::default()
273    /// });
274    ///
275    /// let files = vec![PathBuf::from("a.hedl"), PathBuf::from("b.hedl")];
276    ///
277    /// match processor.process(
278    ///     &files,
279    ///     FormatOperation {
280    ///         check: false,
281    ///         ditto: true,
282    ///         with_counts: false,
283    ///     },
284    ///     true,
285    /// ) {
286    ///     Ok(results) => {
287    ///         println!("Formatted {} files", results.success_count());
288    ///         if results.has_failures() {
289    ///             // Handle individual file failures
290    ///         }
291    ///     }
292    ///     Err(CliError::ThreadPoolError { message, requested_threads }) => {
293    ///         eprintln!("Failed to create thread pool: {}", message);
294    ///     }
295    ///     Err(e) => {
296    ///         eprintln!("Unexpected error: {}", e);
297    ///     }
298    /// }
299    /// # Ok(())
300    /// # }
301    /// ```
302    pub fn process<O>(
303        &self,
304        files: &[PathBuf],
305        operation: O,
306        show_progress: bool,
307    ) -> Result<BatchResults<O::Output>, CliError>
308    where
309        O: BatchOperation,
310    {
311        let start_time = Instant::now();
312
313        if files.is_empty() {
314            // Warn the user when no files match the provided patterns.
315            // Only show warning when progress is enabled, as this indicates
316            // the user expects feedback. Silent mode implies automated/scripted usage.
317            if show_progress {
318                eprintln!(
319                    "{} No files matched the provided patterns",
320                    "Warning:".yellow().bold()
321                );
322                eprintln!(
323                    "{} Check that patterns are correct and files exist",
324                    "Hint:".cyan()
325                );
326            }
327            return Ok(BatchResults::new(vec![], 0));
328        }
329
330        let results = if files.len() < self.config.parallel_threshold {
331            // Serial processing for small batches (no thread pool needed)
332            self.process_serial(files, &operation, show_progress)
333        } else if let Some(max_threads) = self.config.max_threads {
334            // Use local thread pool with specified thread count
335            self.process_with_local_pool(files, &operation, show_progress, max_threads)?
336        } else {
337            // Use default global thread pool (Rayon's default)
338            self.process_parallel(files, &operation, show_progress)
339        };
340
341        let elapsed_ms = start_time.elapsed().as_millis();
342
343        Ok(BatchResults::new(results, elapsed_ms))
344    }
345
346    /// Process files using a local thread pool with specified thread count.
347    ///
348    /// Creates an isolated Rayon thread pool that doesn't affect global state.
349    /// The thread pool is created for this operation and destroyed when complete.
350    ///
351    /// # Arguments
352    ///
353    /// * `files` - Slice of file paths to process
354    /// * `operation` - The operation to perform on each file
355    /// * `show_progress` - Whether to show progress updates
356    /// * `num_threads` - The number of threads to use
357    ///
358    /// # Returns
359    ///
360    /// * `Ok(Vec<FileResult>)` - Successfully processed files with local pool
361    /// * `Err(CliError::ThreadPoolError)` - Failed to create thread pool
362    ///
363    /// # Errors
364    ///
365    /// Returns `ThreadPoolError` if:
366    /// - `num_threads` is 0 (invalid configuration)
367    /// - System cannot allocate thread resources
368    /// - Thread stack allocation fails
369    ///
370    /// # Performance
371    ///
372    /// - Thread pool creation: ~0.5-1ms overhead
373    /// - Memory cost: ~2-8MB per thread (OS thread stacks)
374    /// - Pool lifetime: Duration of this method call
375    fn process_with_local_pool<O>(
376        &self,
377        files: &[PathBuf],
378        operation: &O,
379        show_progress: bool,
380        num_threads: usize,
381    ) -> Result<Vec<FileResult<O::Output>>, CliError>
382    where
383        O: BatchOperation,
384    {
385        // Validate thread count - 0 threads is invalid
386        if num_threads == 0 {
387            return Err(CliError::thread_pool_error(
388                "Cannot create thread pool with 0 threads".to_string(),
389                num_threads,
390            ));
391        }
392
393        // Build a local thread pool
394        let pool = rayon::ThreadPoolBuilder::new()
395            .num_threads(num_threads)
396            .build()
397            .map_err(|e| {
398                CliError::thread_pool_error(
399                    format!("Failed to create thread pool with {num_threads} threads: {e}"),
400                    num_threads,
401                )
402            })?;
403
404        // Run parallel processing within the local pool
405        let results = pool.install(|| self.process_parallel(files, operation, show_progress));
406
407        Ok(results)
408    }
409
410    /// Process files serially (single-threaded).
411    fn process_serial<O>(
412        &self,
413        files: &[PathBuf],
414        operation: &O,
415        show_progress: bool,
416    ) -> Vec<FileResult<O::Output>>
417    where
418        O: BatchOperation,
419    {
420        let tracker = if show_progress {
421            Some(ProgressTracker::new(
422                files.len(),
423                self.config.progress_interval,
424                self.config.verbose,
425            ))
426        } else {
427            None
428        };
429
430        let results: Vec<FileResult<O::Output>> = files
431            .iter()
432            .map(|path| {
433                let result = operation.process_file(path);
434
435                if let Some(ref t) = tracker {
436                    match &result {
437                        Ok(_) => t.record_success(path),
438                        Err(e) => t.record_failure(path, e),
439                    }
440                }
441
442                FileResult {
443                    path: path.clone(),
444                    result: result.map_err(|e| e.clone()),
445                }
446            })
447            .collect();
448
449        if show_progress {
450            if let Some(tracker) = tracker {
451                tracker.print_summary(operation.name());
452            }
453        }
454
455        results
456    }
457
458    /// Process files in parallel using Rayon.
459    fn process_parallel<O>(
460        &self,
461        files: &[PathBuf],
462        operation: &O,
463        show_progress: bool,
464    ) -> Vec<FileResult<O::Output>>
465    where
466        O: BatchOperation,
467    {
468        let tracker = if show_progress {
469            Some(Arc::new(ProgressTracker::new(
470                files.len(),
471                self.config.progress_interval,
472                self.config.verbose,
473            )))
474        } else {
475            None
476        };
477
478        let results: Vec<FileResult<O::Output>> = files
479            .par_iter()
480            .map(|path| {
481                let result = operation.process_file(path);
482
483                if let Some(ref t) = tracker {
484                    match &result {
485                        Ok(_) => t.record_success(path),
486                        Err(e) => t.record_failure(path, e),
487                    }
488                }
489
490                FileResult {
491                    path: path.clone(),
492                    result: result.map_err(|e| e.clone()),
493                }
494            })
495            .collect();
496
497        if show_progress {
498            if let Some(tracker) = tracker {
499                tracker.print_summary(operation.name());
500            }
501        }
502
503        results
504    }
505
506    /// Process files using streaming operations for memory efficiency.
507    ///
508    /// This method uses the streaming parser from `hedl-stream` to process files
509    /// with constant memory usage regardless of file size. Ideal for:
510    /// - Files larger than 100MB
511    /// - Memory-constrained environments
512    /// - Processing thousands of files
513    ///
514    /// # Arguments
515    ///
516    /// * `files` - Slice of file paths to process
517    /// * `operation` - The streaming operation to perform
518    /// * `show_progress` - Whether to show progress updates
519    ///
520    /// # Returns
521    ///
522    /// * `Ok(BatchResults)` - Always succeeds and collects all individual results
523    /// * `Err(CliError)` - Only on catastrophic failures
524    ///
525    /// # Memory Usage
526    ///
527    /// Peak memory = `buffer_size` (8KB) × `num_threads` + ID tracking set
528    ///
529    /// # Examples
530    ///
531    /// ```rust,no_run
532    /// use hedl_cli::batch::{BatchExecutor, StreamingValidationOperation, BatchConfig};
533    /// use std::path::PathBuf;
534    ///
535    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
536    /// let processor = BatchExecutor::default_config();
537    /// let files = vec![PathBuf::from("large-file.hedl")];
538    /// let operation = StreamingValidationOperation { strict: false };
539    ///
540    /// let results = processor.process_streaming(&files, operation, true)?;
541    /// println!("Processed {} files with constant memory", results.success_count());
542    /// # Ok(())
543    /// # }
544    /// ```
545    pub fn process_streaming<O>(
546        &self,
547        files: &[PathBuf],
548        operation: O,
549        show_progress: bool,
550    ) -> Result<BatchResults<O::Output>, CliError>
551    where
552        O: StreamingBatchOperation,
553    {
554        let start_time = Instant::now();
555
556        if files.is_empty() {
557            return Ok(BatchResults::new(vec![], 0));
558        }
559
560        // Configure thread pool if max_threads is specified
561        if let Some(max_threads) = self.config.max_threads {
562            rayon::ThreadPoolBuilder::new()
563                .num_threads(max_threads)
564                .build_global()
565                .ok(); // Ignore error if already initialized
566        }
567
568        let results = if files.len() < self.config.parallel_threshold {
569            self.process_streaming_serial(files, &operation, show_progress)
570        } else {
571            self.process_streaming_parallel(files, &operation, show_progress)
572        };
573
574        let elapsed_ms = start_time.elapsed().as_millis();
575        Ok(BatchResults::new(results, elapsed_ms))
576    }
577
578    /// Process files serially using streaming.
579    fn process_streaming_serial<O>(
580        &self,
581        files: &[PathBuf],
582        operation: &O,
583        show_progress: bool,
584    ) -> Vec<FileResult<O::Output>>
585    where
586        O: StreamingBatchOperation,
587    {
588        let tracker = if show_progress {
589            Some(ProgressTracker::new(
590                files.len(),
591                self.config.progress_interval,
592                self.config.verbose,
593            ))
594        } else {
595            None
596        };
597
598        let results: Vec<FileResult<O::Output>> = files
599            .iter()
600            .map(|path| {
601                let result = operation.process_file_streaming(path);
602
603                if let Some(ref t) = tracker {
604                    match &result {
605                        Ok(_) => t.record_success(path),
606                        Err(e) => t.record_failure(path, e),
607                    }
608                }
609
610                FileResult {
611                    path: path.clone(),
612                    result: result.map_err(|e| e.clone()),
613                }
614            })
615            .collect();
616
617        if show_progress {
618            if let Some(tracker) = tracker {
619                tracker.print_summary(operation.name());
620            }
621        }
622
623        results
624    }
625
626    /// Process files in parallel using streaming.
627    fn process_streaming_parallel<O>(
628        &self,
629        files: &[PathBuf],
630        operation: &O,
631        show_progress: bool,
632    ) -> Vec<FileResult<O::Output>>
633    where
634        O: StreamingBatchOperation,
635    {
636        let tracker = if show_progress {
637            Some(Arc::new(ProgressTracker::new(
638                files.len(),
639                self.config.progress_interval,
640                self.config.verbose,
641            )))
642        } else {
643            None
644        };
645
646        let results: Vec<FileResult<O::Output>> = files
647            .par_iter()
648            .map(|path| {
649                let result = operation.process_file_streaming(path);
650
651                if let Some(ref t) = tracker {
652                    match &result {
653                        Ok(_) => t.record_success(path),
654                        Err(e) => t.record_failure(path, e),
655                    }
656                }
657
658                FileResult {
659                    path: path.clone(),
660                    result: result.map_err(|e| e.clone()),
661                }
662            })
663            .collect();
664
665        if show_progress {
666            if let Some(tracker) = tracker {
667                tracker.print_summary(operation.name());
668            }
669        }
670
671        results
672    }
673
674    /// Automatically choose between standard and streaming based on file size.
675    ///
676    /// Files larger than 100MB use streaming mode for memory efficiency,
677    /// while smaller files use standard mode for better performance.
678    ///
679    /// # Arguments
680    ///
681    /// * `files` - Slice of file paths to process
682    /// * `standard_op` - Standard operation for small files
683    /// * `streaming_op` - Streaming operation for large files
684    /// * `show_progress` - Whether to show progress updates
685    ///
686    /// # Returns
687    ///
688    /// * `Ok(BatchResults)` - Combined results from both modes
689    /// * `Err(CliError)` - On catastrophic failures
690    ///
691    /// # Examples
692    ///
693    /// ```rust,no_run
694    /// use hedl_cli::batch::{BatchExecutor, ValidationOperation, StreamingValidationOperation};
695    /// use std::path::PathBuf;
696    ///
697    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
698    /// let processor = BatchExecutor::default_config();
699    /// let files = vec![
700    ///     PathBuf::from("small.hedl"),
701    ///     PathBuf::from("large-200mb.hedl"),
702    /// ];
703    ///
704    /// let results = processor.process_auto(
705    ///     &files,
706    ///     ValidationOperation { strict: false },
707    ///     StreamingValidationOperation { strict: false },
708    ///     true,
709    /// )?;
710    /// println!("Processed {} files", results.results.len());
711    /// # Ok(())
712    /// # }
713    /// ```
714    pub fn process_auto<O, SO>(
715        &self,
716        files: &[PathBuf],
717        standard_op: O,
718        streaming_op: SO,
719        show_progress: bool,
720    ) -> Result<BatchResults<O::Output>, CliError>
721    where
722        O: BatchOperation<Output = SO::Output>,
723        SO: StreamingBatchOperation,
724    {
725        const STREAMING_THRESHOLD: u64 = 100 * 1024 * 1024; // 100MB
726
727        if files.is_empty() {
728            return Ok(BatchResults::new(vec![], 0));
729        }
730
731        let start_time = Instant::now();
732
733        // Partition files by size
734        let mut small_files = Vec::new();
735        let mut large_files = Vec::new();
736
737        for path in files {
738            match std::fs::metadata(path) {
739                Ok(meta) if meta.len() > STREAMING_THRESHOLD => {
740                    large_files.push(path.clone());
741                }
742                Ok(_) => {
743                    small_files.push(path.clone());
744                }
745                Err(_) => {
746                    // If we can't get size, treat as small
747                    small_files.push(path.clone());
748                }
749            }
750        }
751
752        // Process small files with standard ops
753        let mut all_results = if small_files.is_empty() {
754            Vec::new()
755        } else {
756            self.process(&small_files, standard_op, show_progress)?
757                .results
758        };
759
760        // Process large files with streaming ops
761        if !large_files.is_empty() {
762            let streaming_results = self
763                .process_streaming(&large_files, streaming_op, show_progress)?
764                .results;
765            all_results.extend(streaming_results);
766        }
767
768        // Restore original order
769        let file_order: Vec<&PathBuf> = files.iter().collect();
770        all_results.sort_by_key(|r| {
771            file_order
772                .iter()
773                .position(|&p| p == &r.path)
774                .unwrap_or(usize::MAX)
775        });
776
777        let elapsed_ms = start_time.elapsed().as_millis();
778        Ok(BatchResults::new(all_results, elapsed_ms))
779    }
780}