hedl_cli/
batch.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Batch processing for multiple HEDL files with parallel execution and progress reporting.
19//!
20//! This module provides efficient batch processing capabilities for operations on multiple
21//! HEDL files. It uses Rayon for parallel processing when beneficial and provides real-time
22//! progress reporting with detailed error tracking.
23//!
24//! # Features
25//!
26//! - **Parallel Processing**: Automatic parallelization using Rayon's work-stealing scheduler
27//! - **Progress Reporting**: Real-time progress with file counts and success/failure tracking
28//! - **Error Resilience**: Continues processing on errors, collecting all failures for reporting
29//! - **Performance Optimization**: Intelligent parallel/serial mode selection based on workload
30//! - **Type Safety**: Strongly typed operation definitions with compile-time guarantees
31//!
32//! # Architecture
33//!
34//! The batch processing system uses a functional architecture with:
35//! - Operation trait for extensible batch operations
36//! - Result aggregation with detailed error context
37//! - Atomic counters for thread-safe progress tracking
38//! - Zero-copy file path handling
39//!
40//! # Examples
41//!
42//! ```rust,no_run
43//! use hedl_cli::batch::{BatchProcessor, BatchConfig, ValidationOperation};
44//! use std::path::PathBuf;
45//!
46//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
47//! // Create a batch processor with default configuration
48//! let processor = BatchProcessor::new(BatchConfig::default());
49//!
50//! // Validate multiple files in parallel
51//! let files = vec![
52//!     PathBuf::from("file1.hedl"),
53//!     PathBuf::from("file2.hedl"),
54//!     PathBuf::from("file3.hedl"),
55//! ];
56//!
57//! let operation = ValidationOperation { strict: true };
58//! let results = processor.process(&files, operation, true)?;
59//!
60//! println!("Processed {} files, {} succeeded, {} failed",
61//!     results.total_files(),
62//!     results.success_count(),
63//!     results.failure_count()
64//! );
65//! # Ok(())
66//! # }
67//! ```
68//!
69//! # Performance Characteristics
70//!
71//! - **Small batches (< 10 files)**: Serial processing to avoid overhead
72//! - **Medium batches (10-100 files)**: Parallel with Rayon thread pool
73//! - **Large batches (> 100 files)**: Chunked parallel processing with progress updates
74//!
75//! # Thread Safety
76//!
77//! All progress tracking uses atomic operations for lock-free concurrent access.
78//! Operations are required to be Send + Sync for parallel execution.
79//!
80//! # Thread Pool Management
81//!
82//! The batch processor supports two thread pool strategies:
83//!
84//! ## Global Thread Pool (Default)
85//!
86//! When `max_threads` is `None`, operations use Rayon's global thread pool:
87//! - Zero overhead (no pool creation)
88//! - Shared across all Rayon operations in the process
89//! - Thread count typically matches CPU core count
90//!
91//! ## Local Thread Pool (Isolated)
92//!
93//! When `max_threads` is `Some(n)`, each operation creates an isolated local pool:
94//! - Guaranteed thread count of exactly `n` threads
95//! - No global state pollution
96//! - Supports concurrent operations with different configurations
97//! - Small creation overhead (~0.5-1ms) and memory cost (~2-8MB per thread)
98//!
99//! # Examples
100//!
101//! ```rust,no_run
102//! use hedl_cli::batch::{BatchProcessor, BatchConfig};
103//! use std::path::PathBuf;
104//!
105//! // Concurrent operations with different thread counts
106//! use std::thread;
107//!
108//! let files: Vec<PathBuf> = vec!["a.hedl".into(), "b.hedl".into()];
109//!
110//! let handle1 = thread::spawn(|| {
111//!     let processor = BatchProcessor::new(BatchConfig {
112//!         max_threads: Some(2),
113//!         ..Default::default()
114//!     });
115//!     // Uses 2 threads
116//! });
117//!
118//! let handle2 = thread::spawn(|| {
119//!     let processor = BatchProcessor::new(BatchConfig {
120//!         max_threads: Some(4),
121//!         ..Default::default()
122//!     });
123//!     // Uses 4 threads, isolated from handle1
124//! });
125//! ```
126
127use crate::error::CliError;
128use colored::Colorize;
129use rayon::prelude::*;
130use std::collections::HashSet;
131use std::path::{Path, PathBuf};
132use std::sync::atomic::{AtomicUsize, Ordering};
133use std::sync::Arc;
134use std::time::Instant;
135
136/// Configuration for batch processing operations.
137///
138/// Controls parallelization strategy, progress reporting, and error handling behavior.
139///
140/// # Examples
141///
142/// ```rust
143/// use hedl_cli::batch::BatchConfig;
144///
145/// // Default configuration (auto parallelization)
146/// let config = BatchConfig::default();
147///
148/// // Custom configuration
149/// let config = BatchConfig {
150///     parallel_threshold: 5,  // Parallelize if >= 5 files
151///     max_threads: Some(4),   // Use at most 4 threads
152///     progress_interval: 10,  // Update progress every 10 files
153///     verbose: true,          // Show detailed progress
154///     max_files: Some(10_000), // Limit to 10,000 files
155/// };
156/// ```
157#[derive(Debug, Clone)]
158pub struct BatchConfig {
159    /// Minimum number of files to trigger parallel processing.
160    ///
161    /// Files below this threshold are processed serially to avoid thread pool overhead.
162    /// Default: 10
163    pub parallel_threshold: usize,
164
165    /// Maximum number of threads to use for parallel processing.
166    ///
167    /// When set, creates a local thread pool isolated to this batch operation.
168    /// This ensures configuration always takes effect and prevents global state pollution.
169    ///
170    /// # Behavior
171    ///
172    /// - `None` (default): Uses Rayon's global thread pool (typically number of CPU cores)
173    /// - `Some(n)`: Creates a local thread pool with exactly `n` threads for this operation
174    ///
175    /// # Thread Pool Isolation
176    ///
177    /// Local thread pools provide complete isolation:
178    /// - No global state modification
179    /// - Concurrent batch operations can use different thread counts
180    /// - Configuration is guaranteed to take effect or error explicitly
181    /// - Thread pool lifetime matches the `process()` call duration
182    ///
183    /// # Performance Considerations
184    ///
185    /// Local thread pool creation has small overhead (~0.5-1ms) and memory cost (~2-8MB per thread).
186    /// For maximum performance with default configuration, leave as `None`.
187    ///
188    /// # Examples
189    ///
190    /// ```rust
191    /// use hedl_cli::batch::BatchConfig;
192    ///
193    /// // Default: uses global pool
194    /// let config = BatchConfig::default();
195    ///
196    /// // Custom: creates local pool with 4 threads
197    /// let config = BatchConfig {
198    ///     max_threads: Some(4),
199    ///     ..Default::default()
200    /// };
201    /// ```
202    ///
203    /// Default: None
204    pub max_threads: Option<usize>,
205
206    /// Number of files between progress updates.
207    ///
208    /// Progress is printed every N files processed. Set to 0 to disable.
209    /// Default: 1 (update after each file)
210    pub progress_interval: usize,
211
212    /// Enable verbose progress reporting.
213    ///
214    /// When true, shows file names and detailed status for each file.
215    /// Default: false
216    pub verbose: bool,
217
218    /// Maximum number of files allowed in a batch operation.
219    ///
220    /// This prevents resource exhaustion when processing very large file sets.
221    /// - `Some(n)`: Limit to n files (default: 10,000)
222    /// - `None`: No limit (use with caution)
223    ///
224    /// # Security
225    ///
226    /// Protects against:
227    /// - Memory exhaustion from storing millions of file paths
228    /// - File descriptor exhaustion from concurrent operations
229    /// - Excessive CPU time from unbounded processing
230    ///
231    /// # Configuration
232    ///
233    /// Can be overridden via:
234    /// - Environment variable: `HEDL_MAX_BATCH_FILES`
235    /// - CLI flag: `--max-files <N>`
236    /// - Programmatic: `BatchConfig { max_files: Some(n), .. }`
237    ///
238    /// # Examples
239    ///
240    /// ```rust
241    /// use hedl_cli::batch::BatchConfig;
242    ///
243    /// // Default limit (10,000 files)
244    /// let config = BatchConfig::default();
245    ///
246    /// // Custom limit
247    /// let config = BatchConfig {
248    ///     max_files: Some(50_000),
249    ///     ..Default::default()
250    /// };
251    ///
252    /// // Unlimited (use with caution)
253    /// let config = BatchConfig {
254    ///     max_files: None,
255    ///     ..Default::default()
256    /// };
257    /// ```
258    pub max_files: Option<usize>,
259}
260
261impl Default for BatchConfig {
262    fn default() -> Self {
263        Self {
264            parallel_threshold: 10,
265            max_threads: None,
266            progress_interval: 1,
267            verbose: false,
268            max_files: Some(get_max_batch_files()),
269        }
270    }
271}
272
273/// Get maximum batch files from environment variable or default.
274///
275/// Checks `HEDL_MAX_BATCH_FILES` environment variable. Falls back to
276/// `DEFAULT_MAX_BATCH_FILES` (10,000) if not set or invalid.
277///
278/// # Examples
279///
280/// ```bash
281/// export HEDL_MAX_BATCH_FILES=50000
282/// hedl batch-validate "*.hedl"
283/// ```
284fn get_max_batch_files() -> usize {
285    const DEFAULT_MAX_BATCH_FILES: usize = 10_000;
286
287    std::env::var("HEDL_MAX_BATCH_FILES")
288        .ok()
289        .and_then(|s| s.parse::<usize>().ok())
290        .unwrap_or(DEFAULT_MAX_BATCH_FILES)
291}
292
293/// Validate file count against configured limit.
294///
295/// # Arguments
296///
297/// * `file_count` - Number of files to process
298/// * `max_files` - Maximum allowed files (None = unlimited)
299///
300/// # Returns
301///
302/// * `Ok(())` - File count is within limit
303/// * `Err(CliError)` - File count exceeds limit
304///
305/// # Examples
306///
307/// ```rust
308/// use hedl_cli::batch::validate_file_count;
309///
310/// // Within limit
311/// assert!(validate_file_count(100, Some(1000)).is_ok());
312///
313/// // Exceeds limit
314/// assert!(validate_file_count(2000, Some(1000)).is_err());
315///
316/// // Unlimited
317/// assert!(validate_file_count(1_000_000, None).is_ok());
318/// ```
319pub fn validate_file_count(file_count: usize, max_files: Option<usize>) -> Result<(), CliError> {
320    if let Some(limit) = max_files {
321        if file_count > limit {
322            return Err(CliError::invalid_input(format!(
323                "File count ({file_count}) exceeds maximum limit ({limit}). \
324                 Consider:\n  \
325                 - Refining glob patterns to match fewer files\n  \
326                 - Using --max-files flag to increase limit\n  \
327                 - Setting HEDL_MAX_BATCH_FILES environment variable\n  \
328                 - Processing files in smaller batches"
329            )));
330        }
331    }
332    Ok(())
333}
334
335/// Warn if file count is large and suggest verbose mode.
336///
337/// Prints a warning when processing many files to inform user of operation scale.
338///
339/// # Arguments
340///
341/// * `file_count` - Number of files to process
342/// * `verbose` - Whether verbose mode is enabled
343///
344/// # Threshold
345///
346/// Warns if `file_count` >= 1000 and not already in verbose mode.
347pub fn warn_large_batch(file_count: usize, verbose: bool) {
348    const WARN_THRESHOLD: usize = 1_000;
349
350    if file_count >= WARN_THRESHOLD && !verbose {
351        eprintln!(
352            "{} Processing {} files. Consider using {} for progress updates.",
353            "Warning:".yellow().bold(),
354            file_count.to_string().bright_white(),
355            "--verbose".bright_cyan()
356        );
357    }
358}
359
360/// Result of processing a single file in a batch operation.
361///
362/// Contains the file path and either a success value or an error.
363///
364/// # Type Parameters
365///
366/// * `T` - The success type returned by the operation
367#[derive(Debug, Clone)]
368pub struct FileResult<T> {
369    /// The file path that was processed
370    pub path: PathBuf,
371    /// The result of processing (Ok or Err)
372    pub result: Result<T, CliError>,
373}
374
375impl<T> FileResult<T> {
376    /// Create a successful file result.
377    pub fn success(path: PathBuf, value: T) -> Self {
378        Self {
379            path,
380            result: Ok(value),
381        }
382    }
383
384    /// Create a failed file result.
385    #[must_use]
386    pub fn failure(path: PathBuf, error: CliError) -> Self {
387        Self {
388            path,
389            result: Err(error),
390        }
391    }
392
393    /// Check if the result is successful.
394    pub fn is_success(&self) -> bool {
395        self.result.is_ok()
396    }
397
398    /// Check if the result is a failure.
399    pub fn is_failure(&self) -> bool {
400        self.result.is_err()
401    }
402}
403
404/// Aggregated results from a batch processing operation.
405///
406/// Contains all individual file results and provides statistics.
407///
408/// # Type Parameters
409///
410/// * `T` - The success type returned by the operation
411#[derive(Debug, Clone)]
412pub struct BatchResults<T> {
413    /// Individual results for each processed file
414    pub results: Vec<FileResult<T>>,
415    /// Total processing time in milliseconds
416    pub elapsed_ms: u128,
417}
418
419impl<T> BatchResults<T> {
420    /// Create new batch results from a vector of file results.
421    #[must_use]
422    pub fn new(results: Vec<FileResult<T>>, elapsed_ms: u128) -> Self {
423        Self {
424            results,
425            elapsed_ms,
426        }
427    }
428
429    /// Get the total number of files processed.
430    #[must_use]
431    pub fn total_files(&self) -> usize {
432        self.results.len()
433    }
434
435    /// Get the number of successfully processed files.
436    #[must_use]
437    pub fn success_count(&self) -> usize {
438        self.results.iter().filter(|r| r.is_success()).count()
439    }
440
441    /// Get the number of failed files.
442    #[must_use]
443    pub fn failure_count(&self) -> usize {
444        self.results.iter().filter(|r| r.is_failure()).count()
445    }
446
447    /// Check if all files were processed successfully.
448    #[must_use]
449    pub fn all_succeeded(&self) -> bool {
450        self.results.iter().all(FileResult::is_success)
451    }
452
453    /// Check if any files failed.
454    #[must_use]
455    pub fn has_failures(&self) -> bool {
456        self.results.iter().any(FileResult::is_failure)
457    }
458
459    /// Get an iterator over successful results.
460    pub fn successes(&self) -> impl Iterator<Item = &FileResult<T>> {
461        self.results.iter().filter(|r| r.is_success())
462    }
463
464    /// Get an iterator over failed results.
465    pub fn failures(&self) -> impl Iterator<Item = &FileResult<T>> {
466        self.results.iter().filter(|r| r.is_failure())
467    }
468
469    /// Get processing throughput in files per second.
470    #[must_use]
471    pub fn throughput(&self) -> f64 {
472        if self.elapsed_ms == 0 {
473            0.0
474        } else {
475            (self.total_files() as f64) / (self.elapsed_ms as f64 / 1000.0)
476        }
477    }
478}
479
480/// Trait for batch operations on HEDL files.
481///
482/// Implement this trait to define custom batch operations. The operation must be
483/// thread-safe (Send + Sync) to support parallel processing.
484///
485/// # Type Parameters
486///
487/// * `Output` - The type returned on successful processing of a file
488///
489/// # Examples
490///
491/// ```rust
492/// use hedl_cli::batch::BatchOperation;
493/// use hedl_cli::error::CliError;
494/// use std::path::Path;
495///
496/// struct CountLinesOperation;
497///
498/// impl BatchOperation for CountLinesOperation {
499///     type Output = usize;
500///
501///     fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
502///         let content = std::fs::read_to_string(path)
503///             .map_err(|e| CliError::io_error(path, e))?;
504///         Ok(content.lines().count())
505///     }
506///
507///     fn name(&self) -> &str {
508///         "count-lines"
509///     }
510/// }
511/// ```
512pub trait BatchOperation: Send + Sync {
513    /// The output type for successful processing
514    type Output: Send;
515
516    /// Process a single file and return the result.
517    ///
518    /// # Arguments
519    ///
520    /// * `path` - The path to the file to process
521    ///
522    /// # Returns
523    ///
524    /// * `Ok(Output)` - On successful processing
525    /// * `Err(CliError)` - On any error
526    ///
527    /// # Errors
528    ///
529    /// Should return appropriate `CliError` variants for different failure modes.
530    fn process_file(&self, path: &Path) -> Result<Self::Output, CliError>;
531
532    /// Get a human-readable name for this operation.
533    ///
534    /// Used for progress reporting and logging.
535    fn name(&self) -> &str;
536}
537
538/// Trait for streaming batch operations on HEDL files.
539///
540/// Unlike `BatchOperation` which loads entire files into memory,
541/// streaming operations process files incrementally with constant memory usage.
542/// This is ideal for processing large files (>100MB) or when memory is constrained.
543///
544/// # Memory Characteristics
545///
546/// - **Standard operations**: `O(num_threads` × `file_size`)
547/// - **Streaming operations**: `O(buffer_size` + `ID_set`) ≈ constant
548///
549/// # Type Parameters
550///
551/// * `Output` - The type returned on successful processing of a file
552///
553/// # Examples
554///
555/// ```rust
556/// use hedl_cli::batch::StreamingBatchOperation;
557/// use hedl_cli::error::CliError;
558/// use std::path::Path;
559///
560/// struct StreamingCountOperation;
561///
562/// impl StreamingBatchOperation for StreamingCountOperation {
563///     type Output = usize;
564///
565///     fn process_file_streaming(&self, path: &Path) -> Result<Self::Output, CliError> {
566///         use std::io::BufReader;
567///         use std::fs::File;
568///         use hedl_stream::StreamingParser;
569///
570///         let file = File::open(path).map_err(|e| CliError::io_error(path, e))?;
571///         let reader = BufReader::new(file);
572///         let parser = StreamingParser::new(reader)
573///             .map_err(|e| CliError::parse(e.to_string()))?;
574///
575///         let count = parser.filter(|e| {
576///             matches!(e, Ok(hedl_stream::NodeEvent::Node(_)))
577///         }).count();
578///
579///         Ok(count)
580///     }
581///
582///     fn name(&self) -> &str {
583///         "count-streaming"
584///     }
585/// }
586/// ```
587pub trait StreamingBatchOperation: Send + Sync {
588    /// The output type for successful processing
589    type Output: Send;
590
591    /// Process a file using streaming parser.
592    ///
593    /// # Arguments
594    ///
595    /// * `path` - File path to process
596    ///
597    /// # Returns
598    ///
599    /// * `Ok(Output)` - On successful processing
600    /// * `Err(CliError)` - On any error
601    ///
602    /// # Memory Guarantee
603    ///
604    /// Implementations should maintain O(1) memory usage regardless of file size,
605    /// processing the file incrementally using the streaming parser.
606    fn process_file_streaming(&self, path: &Path) -> Result<Self::Output, CliError>;
607
608    /// Get operation name for progress reporting
609    fn name(&self) -> &str;
610
611    /// Indicate if this operation can run in streaming mode.
612    ///
613    /// Some operations (like formatting) may require full document.
614    /// Default: true
615    fn supports_streaming(&self) -> bool {
616        true
617    }
618}
619
620/// Progress tracker for batch operations.
621///
622/// Uses atomic counters for lock-free concurrent progress tracking.
623#[derive(Debug)]
624struct ProgressTracker {
625    total: usize,
626    processed: AtomicUsize,
627    succeeded: AtomicUsize,
628    failed: AtomicUsize,
629    interval: usize,
630    verbose: bool,
631    start_time: Instant,
632}
633
634impl ProgressTracker {
635    /// Create a new progress tracker.
636    fn new(total: usize, interval: usize, verbose: bool) -> Self {
637        Self {
638            total,
639            processed: AtomicUsize::new(0),
640            succeeded: AtomicUsize::new(0),
641            failed: AtomicUsize::new(0),
642            interval,
643            verbose,
644            start_time: Instant::now(),
645        }
646    }
647
648    /// Record a successful file processing.
649    fn record_success(&self, path: &Path) {
650        let processed = self.processed.fetch_add(1, Ordering::Relaxed) + 1;
651        self.succeeded.fetch_add(1, Ordering::Relaxed);
652
653        if self.should_report(processed) {
654            self.report_progress(path, true);
655        }
656    }
657
658    /// Record a failed file processing.
659    fn record_failure(&self, path: &Path, error: &CliError) {
660        let processed = self.processed.fetch_add(1, Ordering::Relaxed) + 1;
661        self.failed.fetch_add(1, Ordering::Relaxed);
662
663        if self.verbose {
664            eprintln!("{} {} - {}", "✗".red().bold(), path.display(), error);
665        }
666
667        if self.should_report(processed) {
668            self.report_progress(path, false);
669        }
670    }
671
672    /// Check if progress should be reported for this count.
673    fn should_report(&self, processed: usize) -> bool {
674        self.interval > 0 && (processed % self.interval == 0 || processed == self.total)
675    }
676
677    /// Report current progress to stderr.
678    fn report_progress(&self, current_file: &Path, success: bool) {
679        let processed = self.processed.load(Ordering::Relaxed);
680        let succeeded = self.succeeded.load(Ordering::Relaxed);
681        let failed = self.failed.load(Ordering::Relaxed);
682        let elapsed = self.start_time.elapsed();
683        let rate = processed as f64 / elapsed.as_secs_f64();
684
685        if self.verbose {
686            let status = if success {
687                "✓".green().bold()
688            } else {
689                "✗".red().bold()
690            };
691            eprintln!(
692                "{} [{}/{}] {} ({:.1} files/s)",
693                status,
694                processed,
695                self.total,
696                current_file.display(),
697                rate
698            );
699        } else {
700            eprintln!(
701                "Progress: [{}/{}] {} succeeded, {} failed ({:.1} files/s)",
702                processed, self.total, succeeded, failed, rate
703            );
704        }
705    }
706
707    /// Print final summary.
708    fn print_summary(&self, operation_name: &str) {
709        let processed = self.processed.load(Ordering::Relaxed);
710        let succeeded = self.succeeded.load(Ordering::Relaxed);
711        let failed = self.failed.load(Ordering::Relaxed);
712        let elapsed = self.start_time.elapsed();
713
714        println!();
715        println!("{}", "═".repeat(60).bright_blue());
716        println!(
717            "{} {}",
718            "Batch Operation:".bright_blue().bold(),
719            operation_name.bright_white()
720        );
721        println!("{}", "═".repeat(60).bright_blue());
722        println!(
723            "  {} {}",
724            "Total files:".bright_cyan(),
725            processed.to_string().bright_white()
726        );
727        println!(
728            "  {} {}",
729            "Succeeded:".green().bold(),
730            succeeded.to_string().bright_white()
731        );
732        println!(
733            "  {} {}",
734            "Failed:".red().bold(),
735            failed.to_string().bright_white()
736        );
737        println!(
738            "  {} {:.2}s",
739            "Elapsed:".bright_cyan(),
740            elapsed.as_secs_f64()
741        );
742        println!(
743            "  {} {:.1} files/s",
744            "Throughput:".bright_cyan(),
745            processed as f64 / elapsed.as_secs_f64()
746        );
747        println!("{}", "═".repeat(60).bright_blue());
748    }
749}
750
751/// High-performance batch processor for HEDL files.
752///
753/// Orchestrates parallel or serial processing based on configuration and workload.
754/// Provides progress tracking and comprehensive error collection.
755///
756/// # Thread Safety
757///
758/// `BatchProcessor` is thread-safe and can be shared across threads via Arc.
759///
760/// # Examples
761///
762/// ```rust,no_run
763/// use hedl_cli::batch::{BatchProcessor, BatchConfig, ValidationOperation};
764/// use std::path::PathBuf;
765///
766/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
767/// let processor = BatchProcessor::new(BatchConfig {
768///     parallel_threshold: 5,
769///     verbose: true,
770///     ..Default::default()
771/// });
772///
773/// let files: Vec<PathBuf> = vec![
774///     "file1.hedl".into(),
775///     "file2.hedl".into(),
776/// ];
777///
778/// let results = processor.process(
779///     &files,
780///     ValidationOperation { strict: false },
781///     true,
782/// )?;
783///
784/// if results.has_failures() {
785///     eprintln!("Some files failed validation");
786///     for failure in results.failures() {
787///         eprintln!("  - {}: {:?}", failure.path.display(), failure.result);
788///     }
789/// }
790/// # Ok(())
791/// # }
792/// ```
793#[derive(Debug, Clone)]
794pub struct BatchProcessor {
795    config: BatchConfig,
796}
797
798impl BatchProcessor {
799    /// Create a new batch processor with the given configuration.
800    #[must_use]
801    pub fn new(config: BatchConfig) -> Self {
802        Self { config }
803    }
804
805    /// Create a batch processor with default configuration.
806    #[must_use]
807    pub fn default_config() -> Self {
808        Self::new(BatchConfig::default())
809    }
810
811    /// Process multiple files with the given operation.
812    ///
813    /// Automatically selects parallel or serial processing based on configuration
814    /// and file count. Provides progress reporting and collects all results.
815    ///
816    /// # Arguments
817    ///
818    /// * `files` - Slice of file paths to process
819    /// * `operation` - The operation to perform on each file
820    /// * `show_progress` - Whether to show progress updates
821    ///
822    /// # Returns
823    ///
824    /// * `Ok(BatchResults)` - Successfully processed all files (individual failures collected in results)
825    /// * `Err(CliError::ThreadPoolError)` - Failed to create thread pool with requested configuration
826    ///
827    /// # Thread Pool Selection
828    ///
829    /// The method uses different thread pool strategies based on configuration:
830    ///
831    /// 1. **Serial Processing**: If `files.len() < parallel_threshold`, processes serially (no thread pool)
832    /// 2. **Local Thread Pool**: If `max_threads` is `Some(n)`, creates isolated pool with `n` threads
833    /// 3. **Global Thread Pool**: If `max_threads` is `None`, uses Rayon's global pool
834    ///
835    /// # Error Handling
836    ///
837    /// Thread pool creation can fail if:
838    /// - `max_threads` is 0 (invalid configuration)
839    /// - System cannot allocate thread resources
840    /// - Thread stack allocation fails
841    ///
842    /// Individual file processing errors are collected in `BatchResults`, not returned as errors.
843    ///
844    /// # Performance
845    ///
846    /// - Serial processing for small batches to avoid thread pool overhead
847    /// - Local thread pool: ~0.5-1ms creation overhead, ~2-8MB per thread
848    /// - Global thread pool: zero overhead
849    /// - Lock-free progress tracking using atomic counters
850    ///
851    /// # Examples
852    ///
853    /// ```rust,no_run
854    /// use hedl_cli::batch::{BatchProcessor, BatchConfig, FormatOperation};
855    /// use hedl_cli::error::CliError;
856    /// use std::path::PathBuf;
857    ///
858    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
859    /// let processor = BatchProcessor::new(BatchConfig {
860    ///     max_threads: Some(4),
861    ///     ..Default::default()
862    /// });
863    ///
864    /// let files = vec![PathBuf::from("a.hedl"), PathBuf::from("b.hedl")];
865    ///
866    /// match processor.process(
867    ///     &files,
868    ///     FormatOperation {
869    ///         check: false,
870    ///         ditto: true,
871    ///         with_counts: false,
872    ///     },
873    ///     true,
874    /// ) {
875    ///     Ok(results) => {
876    ///         println!("Formatted {} files", results.success_count());
877    ///         if results.has_failures() {
878    ///             // Handle individual file failures
879    ///         }
880    ///     }
881    ///     Err(CliError::ThreadPoolError { message, requested_threads }) => {
882    ///         eprintln!("Failed to create thread pool: {}", message);
883    ///     }
884    ///     Err(e) => {
885    ///         eprintln!("Unexpected error: {}", e);
886    ///     }
887    /// }
888    /// # Ok(())
889    /// # }
890    /// ```
891    pub fn process<O>(
892        &self,
893        files: &[PathBuf],
894        operation: O,
895        show_progress: bool,
896    ) -> Result<BatchResults<O::Output>, CliError>
897    where
898        O: BatchOperation,
899    {
900        let start_time = Instant::now();
901
902        if files.is_empty() {
903            // Warn the user when no files match the provided patterns.
904            // Only show warning when progress is enabled, as this indicates
905            // the user expects feedback. Silent mode implies automated/scripted usage.
906            if show_progress {
907                eprintln!(
908                    "{} No files matched the provided patterns",
909                    "Warning:".yellow().bold()
910                );
911                eprintln!(
912                    "{} Check that patterns are correct and files exist",
913                    "Hint:".cyan()
914                );
915            }
916            return Ok(BatchResults::new(vec![], 0));
917        }
918
919        let results = if files.len() < self.config.parallel_threshold {
920            // Serial processing for small batches (no thread pool needed)
921            self.process_serial(files, &operation, show_progress)
922        } else if let Some(max_threads) = self.config.max_threads {
923            // Use local thread pool with specified thread count
924            self.process_with_local_pool(files, &operation, show_progress, max_threads)?
925        } else {
926            // Use default global thread pool (Rayon's default)
927            self.process_parallel(files, &operation, show_progress)
928        };
929
930        let elapsed_ms = start_time.elapsed().as_millis();
931
932        Ok(BatchResults::new(results, elapsed_ms))
933    }
934
935    /// Process files using a local thread pool with specified thread count.
936    ///
937    /// Creates an isolated Rayon thread pool that doesn't affect global state.
938    /// The thread pool is created for this operation and destroyed when complete.
939    ///
940    /// # Arguments
941    ///
942    /// * `files` - Slice of file paths to process
943    /// * `operation` - The operation to perform on each file
944    /// * `show_progress` - Whether to show progress updates
945    /// * `num_threads` - The number of threads to use
946    ///
947    /// # Returns
948    ///
949    /// * `Ok(Vec<FileResult>)` - Successfully processed files with local pool
950    /// * `Err(CliError::ThreadPoolError)` - Failed to create thread pool
951    ///
952    /// # Errors
953    ///
954    /// Returns `ThreadPoolError` if:
955    /// - `num_threads` is 0 (invalid configuration)
956    /// - System cannot allocate thread resources
957    /// - Thread stack allocation fails
958    ///
959    /// # Performance
960    ///
961    /// - Thread pool creation: ~0.5-1ms overhead
962    /// - Memory cost: ~2-8MB per thread (OS thread stacks)
963    /// - Pool lifetime: Duration of this method call
964    fn process_with_local_pool<O>(
965        &self,
966        files: &[PathBuf],
967        operation: &O,
968        show_progress: bool,
969        num_threads: usize,
970    ) -> Result<Vec<FileResult<O::Output>>, CliError>
971    where
972        O: BatchOperation,
973    {
974        // Validate thread count - 0 threads is invalid
975        if num_threads == 0 {
976            return Err(CliError::thread_pool_error(
977                "Cannot create thread pool with 0 threads".to_string(),
978                num_threads,
979            ));
980        }
981
982        // Build a local thread pool
983        let pool = rayon::ThreadPoolBuilder::new()
984            .num_threads(num_threads)
985            .build()
986            .map_err(|e| {
987                CliError::thread_pool_error(
988                    format!("Failed to create thread pool with {num_threads} threads: {e}"),
989                    num_threads,
990                )
991            })?;
992
993        // Run parallel processing within the local pool
994        let results = pool.install(|| self.process_parallel(files, operation, show_progress));
995
996        Ok(results)
997    }
998
999    /// Process files serially (single-threaded).
1000    fn process_serial<O>(
1001        &self,
1002        files: &[PathBuf],
1003        operation: &O,
1004        show_progress: bool,
1005    ) -> Vec<FileResult<O::Output>>
1006    where
1007        O: BatchOperation,
1008    {
1009        let tracker = if show_progress {
1010            Some(ProgressTracker::new(
1011                files.len(),
1012                self.config.progress_interval,
1013                self.config.verbose,
1014            ))
1015        } else {
1016            None
1017        };
1018
1019        let results: Vec<FileResult<O::Output>> = files
1020            .iter()
1021            .map(|path| {
1022                let result = operation.process_file(path);
1023
1024                if let Some(ref t) = tracker {
1025                    match &result {
1026                        Ok(_) => t.record_success(path),
1027                        Err(e) => t.record_failure(path, e),
1028                    }
1029                }
1030
1031                FileResult {
1032                    path: path.clone(),
1033                    result: result.map_err(|e| e.clone()),
1034                }
1035            })
1036            .collect();
1037
1038        if show_progress {
1039            if let Some(tracker) = tracker {
1040                tracker.print_summary(operation.name());
1041            }
1042        }
1043
1044        results
1045    }
1046
1047    /// Process files in parallel using Rayon.
1048    fn process_parallel<O>(
1049        &self,
1050        files: &[PathBuf],
1051        operation: &O,
1052        show_progress: bool,
1053    ) -> Vec<FileResult<O::Output>>
1054    where
1055        O: BatchOperation,
1056    {
1057        let tracker = if show_progress {
1058            Some(Arc::new(ProgressTracker::new(
1059                files.len(),
1060                self.config.progress_interval,
1061                self.config.verbose,
1062            )))
1063        } else {
1064            None
1065        };
1066
1067        let results: Vec<FileResult<O::Output>> = files
1068            .par_iter()
1069            .map(|path| {
1070                let result = operation.process_file(path);
1071
1072                if let Some(ref t) = tracker {
1073                    match &result {
1074                        Ok(_) => t.record_success(path),
1075                        Err(e) => t.record_failure(path, e),
1076                    }
1077                }
1078
1079                FileResult {
1080                    path: path.clone(),
1081                    result: result.map_err(|e| e.clone()),
1082                }
1083            })
1084            .collect();
1085
1086        if show_progress {
1087            if let Some(tracker) = tracker {
1088                tracker.print_summary(operation.name());
1089            }
1090        }
1091
1092        results
1093    }
1094
1095    /// Process files using streaming operations for memory efficiency.
1096    ///
1097    /// This method uses the streaming parser from `hedl-stream` to process files
1098    /// with constant memory usage regardless of file size. Ideal for:
1099    /// - Files larger than 100MB
1100    /// - Memory-constrained environments
1101    /// - Processing thousands of files
1102    ///
1103    /// # Arguments
1104    ///
1105    /// * `files` - Slice of file paths to process
1106    /// * `operation` - The streaming operation to perform
1107    /// * `show_progress` - Whether to show progress updates
1108    ///
1109    /// # Returns
1110    ///
1111    /// * `Ok(BatchResults)` - Always succeeds and collects all individual results
1112    /// * `Err(CliError)` - Only on catastrophic failures
1113    ///
1114    /// # Memory Usage
1115    ///
1116    /// Peak memory = `buffer_size` (8KB) × `num_threads` + ID tracking set
1117    ///
1118    /// # Examples
1119    ///
1120    /// ```rust,no_run
1121    /// use hedl_cli::batch::{BatchProcessor, StreamingValidationOperation, BatchConfig};
1122    /// use std::path::PathBuf;
1123    ///
1124    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1125    /// let processor = BatchProcessor::default_config();
1126    /// let files = vec![PathBuf::from("large-file.hedl")];
1127    /// let operation = StreamingValidationOperation { strict: false };
1128    ///
1129    /// let results = processor.process_streaming(&files, operation, true)?;
1130    /// println!("Processed {} files with constant memory", results.success_count());
1131    /// # Ok(())
1132    /// # }
1133    /// ```
1134    pub fn process_streaming<O>(
1135        &self,
1136        files: &[PathBuf],
1137        operation: O,
1138        show_progress: bool,
1139    ) -> Result<BatchResults<O::Output>, CliError>
1140    where
1141        O: StreamingBatchOperation,
1142    {
1143        let start_time = Instant::now();
1144
1145        if files.is_empty() {
1146            return Ok(BatchResults::new(vec![], 0));
1147        }
1148
1149        // Configure thread pool if max_threads is specified
1150        if let Some(max_threads) = self.config.max_threads {
1151            rayon::ThreadPoolBuilder::new()
1152                .num_threads(max_threads)
1153                .build_global()
1154                .ok(); // Ignore error if already initialized
1155        }
1156
1157        let results = if files.len() < self.config.parallel_threshold {
1158            self.process_streaming_serial(files, &operation, show_progress)
1159        } else {
1160            self.process_streaming_parallel(files, &operation, show_progress)
1161        };
1162
1163        let elapsed_ms = start_time.elapsed().as_millis();
1164        Ok(BatchResults::new(results, elapsed_ms))
1165    }
1166
1167    /// Process files serially using streaming.
1168    fn process_streaming_serial<O>(
1169        &self,
1170        files: &[PathBuf],
1171        operation: &O,
1172        show_progress: bool,
1173    ) -> Vec<FileResult<O::Output>>
1174    where
1175        O: StreamingBatchOperation,
1176    {
1177        let tracker = if show_progress {
1178            Some(ProgressTracker::new(
1179                files.len(),
1180                self.config.progress_interval,
1181                self.config.verbose,
1182            ))
1183        } else {
1184            None
1185        };
1186
1187        let results: Vec<FileResult<O::Output>> = files
1188            .iter()
1189            .map(|path| {
1190                let result = operation.process_file_streaming(path);
1191
1192                if let Some(ref t) = tracker {
1193                    match &result {
1194                        Ok(_) => t.record_success(path),
1195                        Err(e) => t.record_failure(path, e),
1196                    }
1197                }
1198
1199                FileResult {
1200                    path: path.clone(),
1201                    result: result.map_err(|e| e.clone()),
1202                }
1203            })
1204            .collect();
1205
1206        if show_progress {
1207            if let Some(tracker) = tracker {
1208                tracker.print_summary(operation.name());
1209            }
1210        }
1211
1212        results
1213    }
1214
1215    /// Process files in parallel using streaming.
1216    fn process_streaming_parallel<O>(
1217        &self,
1218        files: &[PathBuf],
1219        operation: &O,
1220        show_progress: bool,
1221    ) -> Vec<FileResult<O::Output>>
1222    where
1223        O: StreamingBatchOperation,
1224    {
1225        let tracker = if show_progress {
1226            Some(Arc::new(ProgressTracker::new(
1227                files.len(),
1228                self.config.progress_interval,
1229                self.config.verbose,
1230            )))
1231        } else {
1232            None
1233        };
1234
1235        let results: Vec<FileResult<O::Output>> = files
1236            .par_iter()
1237            .map(|path| {
1238                let result = operation.process_file_streaming(path);
1239
1240                if let Some(ref t) = tracker {
1241                    match &result {
1242                        Ok(_) => t.record_success(path),
1243                        Err(e) => t.record_failure(path, e),
1244                    }
1245                }
1246
1247                FileResult {
1248                    path: path.clone(),
1249                    result: result.map_err(|e| e.clone()),
1250                }
1251            })
1252            .collect();
1253
1254        if show_progress {
1255            if let Some(tracker) = tracker {
1256                tracker.print_summary(operation.name());
1257            }
1258        }
1259
1260        results
1261    }
1262
1263    /// Automatically choose between standard and streaming based on file size.
1264    ///
1265    /// Files larger than 100MB use streaming mode for memory efficiency,
1266    /// while smaller files use standard mode for better performance.
1267    ///
1268    /// # Arguments
1269    ///
1270    /// * `files` - Slice of file paths to process
1271    /// * `standard_op` - Standard operation for small files
1272    /// * `streaming_op` - Streaming operation for large files
1273    /// * `show_progress` - Whether to show progress updates
1274    ///
1275    /// # Returns
1276    ///
1277    /// * `Ok(BatchResults)` - Combined results from both modes
1278    /// * `Err(CliError)` - On catastrophic failures
1279    ///
1280    /// # Examples
1281    ///
1282    /// ```rust,no_run
1283    /// use hedl_cli::batch::{BatchProcessor, ValidationOperation, StreamingValidationOperation};
1284    /// use std::path::PathBuf;
1285    ///
1286    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1287    /// let processor = BatchProcessor::default_config();
1288    /// let files = vec![
1289    ///     PathBuf::from("small.hedl"),
1290    ///     PathBuf::from("large-200mb.hedl"),
1291    /// ];
1292    ///
1293    /// let results = processor.process_auto(
1294    ///     &files,
1295    ///     ValidationOperation { strict: false },
1296    ///     StreamingValidationOperation { strict: false },
1297    ///     true,
1298    /// )?;
1299    /// println!("Processed {} files", results.results.len());
1300    /// # Ok(())
1301    /// # }
1302    /// ```
1303    pub fn process_auto<O, SO>(
1304        &self,
1305        files: &[PathBuf],
1306        standard_op: O,
1307        streaming_op: SO,
1308        show_progress: bool,
1309    ) -> Result<BatchResults<O::Output>, CliError>
1310    where
1311        O: BatchOperation<Output = SO::Output>,
1312        SO: StreamingBatchOperation,
1313    {
1314        const STREAMING_THRESHOLD: u64 = 100 * 1024 * 1024; // 100MB
1315
1316        if files.is_empty() {
1317            return Ok(BatchResults::new(vec![], 0));
1318        }
1319
1320        let start_time = Instant::now();
1321
1322        // Partition files by size
1323        let mut small_files = Vec::new();
1324        let mut large_files = Vec::new();
1325
1326        for path in files {
1327            match std::fs::metadata(path) {
1328                Ok(meta) if meta.len() > STREAMING_THRESHOLD => {
1329                    large_files.push(path.clone());
1330                }
1331                Ok(_) => {
1332                    small_files.push(path.clone());
1333                }
1334                Err(_) => {
1335                    // If we can't get size, treat as small
1336                    small_files.push(path.clone());
1337                }
1338            }
1339        }
1340
1341        // Process small files with standard ops
1342        let mut all_results = if small_files.is_empty() {
1343            Vec::new()
1344        } else {
1345            self.process(&small_files, standard_op, show_progress)?
1346                .results
1347        };
1348
1349        // Process large files with streaming ops
1350        if !large_files.is_empty() {
1351            let streaming_results = self
1352                .process_streaming(&large_files, streaming_op, show_progress)?
1353                .results;
1354            all_results.extend(streaming_results);
1355        }
1356
1357        // Restore original order
1358        let file_order: Vec<&PathBuf> = files.iter().collect();
1359        all_results.sort_by_key(|r| {
1360            file_order
1361                .iter()
1362                .position(|&p| p == &r.path)
1363                .unwrap_or(usize::MAX)
1364        });
1365
1366        let elapsed_ms = start_time.elapsed().as_millis();
1367        Ok(BatchResults::new(all_results, elapsed_ms))
1368    }
1369}
1370
1371// ============================================================================
1372// Standard Operations
1373// ============================================================================
1374
1375/// Batch validation operation.
1376///
1377/// Validates multiple HEDL files in parallel, checking syntax and optionally
1378/// enforcing strict reference resolution.
1379#[derive(Debug, Clone)]
1380pub struct ValidationOperation {
1381    /// Enable strict reference validation
1382    pub strict: bool,
1383}
1384
1385impl BatchOperation for ValidationOperation {
1386    type Output = ValidationStats;
1387
1388    fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
1389        use hedl_core::{parse_with_limits, Item, Node, ParseOptions, ReferenceMode};
1390
1391        let content = std::fs::read_to_string(path).map_err(|e| CliError::io_error(path, e))?;
1392
1393        let options = ParseOptions {
1394            reference_mode: if self.strict {
1395                ReferenceMode::Strict
1396            } else {
1397                ReferenceMode::Lenient
1398            },
1399            ..ParseOptions::default()
1400        };
1401
1402        let doc = parse_with_limits(content.as_bytes(), options)
1403            .map_err(|e| CliError::parse(e.to_string()))?;
1404
1405        // Collect statistics from the parsed document
1406        let mut stats = ValidationStats::new();
1407
1408        // Get version from document metadata
1409        stats.version = format!("{}.{}", doc.version.0, doc.version.1);
1410
1411        // Recursive helper to count nodes
1412        fn count_node(node: &Node, stats: &mut ValidationStats) {
1413            stats.node_count += 1;
1414            stats.field_count += node.fields.len();
1415            let full_id = format!("{}:{}", node.type_name, node.id);
1416            stats.seen_ids.insert(full_id);
1417
1418            // Count children recursively
1419            if let Some(ref children) = node.children {
1420                for child_nodes in children.values() {
1421                    for child in child_nodes {
1422                        count_node(child, stats);
1423                    }
1424                }
1425            }
1426        }
1427
1428        // Recursive helper to traverse items
1429        fn traverse_item(item: &Item, stats: &mut ValidationStats) {
1430            match item {
1431                Item::List(list) => {
1432                    stats.list_count += 1;
1433                    for node in &list.rows {
1434                        count_node(node, stats);
1435                    }
1436                }
1437                Item::Object(obj) => {
1438                    for child_item in obj.values() {
1439                        traverse_item(child_item, stats);
1440                    }
1441                }
1442                Item::Scalar(_) => {
1443                    // Scalars don't contribute to node counts
1444                }
1445            }
1446        }
1447
1448        // Traverse all items in the document root
1449        for item in doc.root.values() {
1450            traverse_item(item, &mut stats);
1451        }
1452
1453        Ok(stats)
1454    }
1455
1456    fn name(&self) -> &'static str {
1457        "validate"
1458    }
1459}
1460
1461/// Batch format operation.
1462///
1463/// Formats multiple HEDL files to canonical form, optionally checking if files
1464/// are already canonical.
1465#[derive(Debug, Clone)]
1466pub struct FormatOperation {
1467    /// Only check if files are canonical (don't write)
1468    pub check: bool,
1469    /// Use ditto optimization
1470    pub ditto: bool,
1471    /// Add count hints to matrix lists
1472    pub with_counts: bool,
1473}
1474
1475impl BatchOperation for FormatOperation {
1476    type Output = String;
1477
1478    fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
1479        use hedl_c14n::{canonicalize_with_config, CanonicalConfig};
1480        use hedl_core::parse;
1481
1482        let content = std::fs::read_to_string(path).map_err(|e| CliError::io_error(path, e))?;
1483
1484        let mut doc = parse(content.as_bytes()).map_err(|e| CliError::parse(e.to_string()))?;
1485
1486        // Add count hints if requested
1487        if self.with_counts {
1488            add_count_hints(&mut doc);
1489        }
1490
1491        let config = CanonicalConfig::new().with_ditto(self.ditto);
1492
1493        let canonical = canonicalize_with_config(&doc, &config)
1494            .map_err(|e| CliError::canonicalization(e.to_string()))?;
1495
1496        if self.check && canonical != content {
1497            return Err(CliError::NotCanonical);
1498        }
1499
1500        Ok(canonical)
1501    }
1502
1503    fn name(&self) -> &str {
1504        if self.check {
1505            "format-check"
1506        } else {
1507            "format"
1508        }
1509    }
1510}
1511
1512/// Batch lint operation.
1513///
1514/// Lints multiple HEDL files for best practices and common issues.
1515#[derive(Debug, Clone)]
1516pub struct LintOperation {
1517    /// Treat warnings as errors
1518    pub warn_error: bool,
1519}
1520
1521impl BatchOperation for LintOperation {
1522    type Output = Vec<String>;
1523
1524    fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
1525        use hedl_core::parse;
1526        use hedl_lint::lint;
1527
1528        let content = std::fs::read_to_string(path).map_err(|e| CliError::io_error(path, e))?;
1529
1530        let doc = parse(content.as_bytes()).map_err(|e| CliError::parse(e.to_string()))?;
1531
1532        let diagnostics = lint(&doc);
1533
1534        if self.warn_error && !diagnostics.is_empty() {
1535            return Err(CliError::LintErrors);
1536        }
1537
1538        Ok(diagnostics
1539            .iter()
1540            .map(std::string::ToString::to_string)
1541            .collect())
1542    }
1543
1544    fn name(&self) -> &'static str {
1545        "lint"
1546    }
1547}
1548
1549// ============================================================================
1550// Streaming Operations
1551// ============================================================================
1552
1553/// Statistics collected during streaming validation.
1554///
1555/// Provides detailed statistics about the parsed document including
1556/// entity counts, field counts, and ID tracking for reference validation.
1557#[derive(Debug, Clone, Default)]
1558pub struct ValidationStats {
1559    /// HEDL version string
1560    pub version: String,
1561    /// Number of lists encountered
1562    pub list_count: usize,
1563    /// Total number of nodes processed
1564    pub node_count: usize,
1565    /// Total number of fields across all nodes
1566    pub field_count: usize,
1567    /// Set of seen IDs for strict reference validation (type:id format)
1568    pub seen_ids: HashSet<String>,
1569}
1570
1571impl ValidationStats {
1572    /// Create new empty validation statistics
1573    #[must_use]
1574    pub fn new() -> Self {
1575        Self::default()
1576    }
1577}
1578
1579/// Streaming validation operation for memory-efficient validation of large files.
1580///
1581/// Uses the streaming parser from `hedl-stream` to validate files with O(1) memory
1582/// usage regardless of file size. Ideal for:
1583/// - Files larger than 100MB
1584/// - Validating thousands of files with limited RAM
1585/// - Container environments with memory limits
1586///
1587/// # Memory Profile
1588///
1589/// - **Input**: O(1) - buffer size only (~8KB)
1590/// - **Working**: `O(n_ids)` - seen ID set for strict validation
1591/// - **Output**: O(1) - small statistics struct
1592/// - **Peak**: ~8KB + ID set size (vs. full file size in standard mode)
1593///
1594/// # Examples
1595///
1596/// ```rust,no_run
1597/// use hedl_cli::batch::{BatchProcessor, StreamingValidationOperation};
1598/// use std::path::PathBuf;
1599///
1600/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1601/// let processor = BatchProcessor::default_config();
1602/// let files = vec![PathBuf::from("large-file.hedl")];
1603///
1604/// let operation = StreamingValidationOperation { strict: false };
1605/// let results = processor.process_streaming(&files, operation, true)?;
1606///
1607/// println!("Validated {} files with constant memory", results.success_count());
1608/// # Ok(())
1609/// # }
1610/// ```
1611#[derive(Debug, Clone)]
1612pub struct StreamingValidationOperation {
1613    /// Enable strict reference validation
1614    pub strict: bool,
1615}
1616
1617impl StreamingBatchOperation for StreamingValidationOperation {
1618    type Output = ValidationStats;
1619
1620    fn process_file_streaming(&self, path: &Path) -> Result<Self::Output, CliError> {
1621        use hedl_stream::{NodeEvent, StreamError, StreamingParser};
1622        use std::fs::File;
1623        use std::io::BufReader;
1624
1625        let file = File::open(path).map_err(|e| CliError::io_error(path, e))?;
1626        let reader = BufReader::with_capacity(8192, file);
1627
1628        let parser = StreamingParser::new(reader)
1629            .map_err(|e: StreamError| CliError::parse(e.to_string()))?;
1630
1631        let mut stats = ValidationStats::new();
1632        let mut _current_type = String::new();
1633
1634        // Process events incrementally
1635        for event in parser {
1636            let event = event.map_err(|e: StreamError| CliError::parse(e.to_string()))?;
1637
1638            match event {
1639                NodeEvent::Header(info) => {
1640                    // Validate version exists
1641                    let version_str = format!("{}.{}", info.version.0, info.version.1);
1642                    if version_str.is_empty() {
1643                        return Err(CliError::parse("Missing VERSION".to_string()));
1644                    }
1645                    stats.version = version_str;
1646                }
1647                NodeEvent::ListStart { type_name, .. } => {
1648                    stats.list_count += 1;
1649                    _current_type = type_name;
1650                }
1651                NodeEvent::Node(node) => {
1652                    stats.node_count += 1;
1653                    stats.field_count += node.fields.len();
1654
1655                    // Track IDs for strict mode validation
1656                    let full_id = format!("{}:{}", node.type_name, node.id);
1657
1658                    if self.strict {
1659                        // In strict mode, validate references
1660                        // For now, just track IDs - full reference validation
1661                        // would require accumulating references and validating at end
1662                        stats.seen_ids.insert(full_id);
1663                    } else {
1664                        stats.seen_ids.insert(full_id);
1665                    }
1666                }
1667                NodeEvent::ListEnd { .. } => {
1668                    // List validation complete
1669                }
1670                NodeEvent::Scalar { .. } => {
1671                    // Scalar validation - no action needed
1672                }
1673                NodeEvent::ObjectStart { .. } => {
1674                    // Object start - no action needed
1675                }
1676                NodeEvent::ObjectEnd { .. } => {
1677                    // Object end - no action needed
1678                }
1679                NodeEvent::EndOfDocument => {
1680                    // Document complete
1681                    break;
1682                }
1683            }
1684        }
1685
1686        Ok(stats)
1687    }
1688
1689    fn name(&self) -> &'static str {
1690        "validate-streaming"
1691    }
1692
1693    fn supports_streaming(&self) -> bool {
1694        true
1695    }
1696}
1697
1698// ============================================================================
1699// Helper Functions for Count Hints
1700// ============================================================================
1701
1702/// Recursively add count hints to all matrix lists in the document
1703fn add_count_hints(doc: &mut hedl_core::Document) {
1704    for item in doc.root.values_mut() {
1705        add_count_hints_to_item(item);
1706    }
1707}
1708
1709/// Recursively add count hints to an item
1710fn add_count_hints_to_item(item: &mut hedl_core::Item) {
1711    use hedl_core::Item;
1712
1713    match item {
1714        Item::List(list) => {
1715            // Set count hint based on actual row count
1716            list.count_hint = Some(list.rows.len());
1717
1718            // Recursively add child counts to each node
1719            for node in &mut list.rows {
1720                add_child_count_to_node(node);
1721            }
1722        }
1723        Item::Object(map) => {
1724            // Recursively process nested objects
1725            for nested_item in map.values_mut() {
1726                add_count_hints_to_item(nested_item);
1727            }
1728        }
1729        Item::Scalar(_) => {
1730            // Scalars don't have matrix lists
1731        }
1732    }
1733}
1734
1735/// Recursively set `child_count` on nodes that have children
1736fn add_child_count_to_node(node: &mut hedl_core::Node) {
1737    // Calculate total number of direct children across all child types
1738    let total_children: usize = node
1739        .children()
1740        .map_or(0, |c| c.values().map(std::vec::Vec::len).sum());
1741
1742    if total_children > 0 {
1743        node.child_count = total_children.min(u16::MAX as usize) as u16;
1744
1745        // Recursively process all child nodes
1746        if let Some(children) = node.children_mut() {
1747            for child_list in children.values_mut() {
1748                for child_node in child_list {
1749                    add_child_count_to_node(child_node);
1750                }
1751            }
1752        }
1753    }
1754}
1755
1756#[cfg(test)]
1757mod tests {
1758    use super::*;
1759    use serial_test::serial;
1760
1761    #[test]
1762    fn test_batch_config_default() {
1763        let config = BatchConfig::default();
1764        assert_eq!(config.parallel_threshold, 10);
1765        assert!(config.max_threads.is_none());
1766        assert_eq!(config.progress_interval, 1);
1767        assert!(!config.verbose);
1768    }
1769
1770    #[test]
1771    fn test_file_result_success() {
1772        let result = FileResult::success(PathBuf::from("test.hedl"), 42);
1773        assert!(result.is_success());
1774        assert!(!result.is_failure());
1775        assert_eq!(result.result.unwrap(), 42);
1776    }
1777
1778    #[test]
1779    fn test_file_result_failure() {
1780        let result: FileResult<()> =
1781            FileResult::failure(PathBuf::from("test.hedl"), CliError::NotCanonical);
1782        assert!(!result.is_success());
1783        assert!(result.is_failure());
1784        assert!(result.result.is_err());
1785    }
1786
1787    #[test]
1788    fn test_batch_results_statistics() {
1789        let results = vec![
1790            FileResult::success(PathBuf::from("a.hedl"), ()),
1791            FileResult::success(PathBuf::from("b.hedl"), ()),
1792            FileResult::failure(PathBuf::from("c.hedl"), CliError::NotCanonical),
1793        ];
1794
1795        let batch = BatchResults::new(results, 1000);
1796
1797        assert_eq!(batch.total_files(), 3);
1798        assert_eq!(batch.success_count(), 2);
1799        assert_eq!(batch.failure_count(), 1);
1800        assert!(!batch.all_succeeded());
1801        assert!(batch.has_failures());
1802        assert_eq!(batch.successes().count(), 2);
1803        assert_eq!(batch.failures().count(), 1);
1804    }
1805
1806    #[test]
1807    fn test_batch_results_throughput() {
1808        let results = vec![
1809            FileResult::success(PathBuf::from("a.hedl"), ()),
1810            FileResult::success(PathBuf::from("b.hedl"), ()),
1811        ];
1812
1813        let batch = BatchResults::new(results, 1000); // 1 second
1814        assert!((batch.throughput() - 2.0).abs() < 0.01);
1815
1816        let batch_zero: BatchResults<()> = BatchResults::new(vec![], 0);
1817        assert_eq!(batch_zero.throughput(), 0.0);
1818    }
1819
1820    #[test]
1821    fn test_progress_tracker_should_report() {
1822        let tracker = ProgressTracker::new(100, 10, false);
1823
1824        assert!(!tracker.should_report(1));
1825        assert!(!tracker.should_report(9));
1826        assert!(tracker.should_report(10)); // Interval boundary
1827        assert!(tracker.should_report(100)); // End
1828    }
1829
1830    // Mock operation for testing
1831    struct MockOperation {
1832        should_fail: bool,
1833    }
1834
1835    impl BatchOperation for MockOperation {
1836        type Output = String;
1837
1838        fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
1839            if self.should_fail {
1840                Err(CliError::NotCanonical)
1841            } else {
1842                Ok(path.to_string_lossy().to_string())
1843            }
1844        }
1845
1846        fn name(&self) -> &'static str {
1847            "mock"
1848        }
1849    }
1850
1851    #[test]
1852    fn test_batch_processor_empty() {
1853        let processor = BatchProcessor::default_config();
1854        let results = processor
1855            .process(&[], MockOperation { should_fail: false }, false)
1856            .unwrap();
1857
1858        assert_eq!(results.total_files(), 0);
1859        assert!(results.all_succeeded());
1860    }
1861
1862    #[test]
1863    fn test_batch_processor_empty_with_progress_shows_warning() {
1864        // This test verifies that empty file list with show_progress=true
1865        // completes successfully (does not panic or return an error).
1866        // The actual warning output goes to stderr and is difficult to capture
1867        // in unit tests, but integration tests verify the output.
1868        let processor = BatchProcessor::default_config();
1869
1870        let results = processor
1871            .process(&[], MockOperation { should_fail: false }, true)
1872            .unwrap();
1873
1874        // Empty batch should succeed (not error)
1875        assert_eq!(results.total_files(), 0);
1876        assert_eq!(results.success_count(), 0);
1877        assert_eq!(results.failure_count(), 0);
1878        assert!(results.all_succeeded());
1879    }
1880
1881    #[test]
1882    fn test_batch_processor_empty_without_progress_silent() {
1883        // Verify that empty file list with show_progress=false succeeds silently
1884        let processor = BatchProcessor::default_config();
1885
1886        let results = processor
1887            .process(&[], MockOperation { should_fail: false }, false)
1888            .unwrap();
1889
1890        assert_eq!(results.total_files(), 0);
1891        assert!(results.all_succeeded());
1892        // No warning should be printed (verified via integration test)
1893    }
1894
1895    #[test]
1896    fn test_empty_batch_returns_ok_not_error() {
1897        // Ensure backward compatibility: empty batch is NOT an error condition
1898        let processor = BatchProcessor::default_config();
1899
1900        let result = processor.process(&[], MockOperation { should_fail: false }, true);
1901
1902        // Empty batch should return Ok, not Err
1903        assert!(result.is_ok());
1904
1905        let results = result.unwrap();
1906        assert_eq!(results.total_files(), 0);
1907        assert_eq!(results.success_count(), 0);
1908        assert_eq!(results.failure_count(), 0);
1909    }
1910
1911    #[test]
1912    fn test_batch_processor_serial_success() {
1913        let processor = BatchProcessor::new(BatchConfig {
1914            parallel_threshold: 100, // Force serial for small batch
1915            ..Default::default()
1916        });
1917
1918        let files = vec![
1919            PathBuf::from("a.hedl"),
1920            PathBuf::from("b.hedl"),
1921            PathBuf::from("c.hedl"),
1922        ];
1923
1924        let results = processor
1925            .process(&files, MockOperation { should_fail: false }, false)
1926            .unwrap();
1927
1928        assert_eq!(results.total_files(), 3);
1929        assert_eq!(results.success_count(), 3);
1930        assert_eq!(results.failure_count(), 0);
1931        assert!(results.all_succeeded());
1932    }
1933
1934    #[test]
1935    fn test_batch_processor_serial_with_failures() {
1936        let processor = BatchProcessor::new(BatchConfig {
1937            parallel_threshold: 100,
1938            ..Default::default()
1939        });
1940
1941        let files = vec![PathBuf::from("a.hedl"), PathBuf::from("b.hedl")];
1942
1943        let results = processor
1944            .process(&files, MockOperation { should_fail: true }, false)
1945            .unwrap();
1946
1947        assert_eq!(results.total_files(), 2);
1948        assert_eq!(results.success_count(), 0);
1949        assert_eq!(results.failure_count(), 2);
1950        assert!(!results.all_succeeded());
1951        assert!(results.has_failures());
1952    }
1953
1954    #[test]
1955    fn test_batch_processor_parallel() {
1956        let processor = BatchProcessor::new(BatchConfig {
1957            parallel_threshold: 2, // Force parallel
1958            ..Default::default()
1959        });
1960
1961        let files: Vec<PathBuf> = (0..20)
1962            .map(|i| PathBuf::from(format!("file{i}.hedl")))
1963            .collect();
1964
1965        let results = processor
1966            .process(&files, MockOperation { should_fail: false }, false)
1967            .unwrap();
1968
1969        assert_eq!(results.total_files(), 20);
1970        assert_eq!(results.success_count(), 20);
1971    }
1972
1973    #[test]
1974    fn test_validate_file_count_within_limit() {
1975        assert!(validate_file_count(100, Some(1000)).is_ok());
1976    }
1977
1978    #[test]
1979    fn test_validate_file_count_at_limit() {
1980        assert!(validate_file_count(1000, Some(1000)).is_ok());
1981    }
1982
1983    #[test]
1984    fn test_validate_file_count_exceeds_limit() {
1985        let result = validate_file_count(2000, Some(1000));
1986        assert!(result.is_err());
1987        let err = result.unwrap_err();
1988        assert!(err.to_string().contains("exceeds maximum limit"));
1989    }
1990
1991    #[test]
1992    fn test_validate_file_count_unlimited() {
1993        // None = unlimited
1994        assert!(validate_file_count(1_000_000, None).is_ok());
1995    }
1996
1997    #[test]
1998    fn test_validate_file_count_zero_files() {
1999        // Zero files always OK regardless of limit
2000        assert!(validate_file_count(0, Some(100)).is_ok());
2001    }
2002
2003    #[test]
2004    #[serial]
2005    fn test_get_max_batch_files_default() {
2006        std::env::remove_var("HEDL_MAX_BATCH_FILES");
2007        let max = get_max_batch_files();
2008        assert_eq!(max, 10_000);
2009    }
2010
2011    #[test]
2012    #[serial]
2013    fn test_get_max_batch_files_env_override() {
2014        std::env::set_var("HEDL_MAX_BATCH_FILES", "50000");
2015        let max = get_max_batch_files();
2016        assert_eq!(max, 50_000);
2017        std::env::remove_var("HEDL_MAX_BATCH_FILES");
2018    }
2019
2020    #[test]
2021    #[serial]
2022    fn test_get_max_batch_files_invalid_env() {
2023        std::env::set_var("HEDL_MAX_BATCH_FILES", "invalid");
2024        let max = get_max_batch_files();
2025        assert_eq!(max, 10_000); // Falls back to default
2026        std::env::remove_var("HEDL_MAX_BATCH_FILES");
2027    }
2028
2029    #[test]
2030    #[serial]
2031    fn test_batch_config_default_has_limit() {
2032        std::env::remove_var("HEDL_MAX_BATCH_FILES");
2033        let config = BatchConfig::default();
2034        assert!(config.max_files.is_some());
2035        assert_eq!(config.max_files.unwrap(), 10_000);
2036    }
2037
2038    #[test]
2039    fn test_warn_large_batch_above_threshold() {
2040        // Note: This test just verifies no panic, can't easily test stderr output
2041        warn_large_batch(5000, false);
2042    }
2043
2044    #[test]
2045    fn test_warn_large_batch_below_threshold() {
2046        warn_large_batch(500, false);
2047    }
2048
2049    #[test]
2050    fn test_warn_large_batch_verbose_suppresses() {
2051        warn_large_batch(5000, true);
2052    }
2053
2054    // ============================================================================
2055    // Thread Pool Tests
2056    // ============================================================================
2057
2058    #[test]
2059    fn test_local_thread_pool_creation() {
2060        let processor = BatchProcessor::new(BatchConfig {
2061            max_threads: Some(2),
2062            parallel_threshold: 1, // Force parallel even with 2 files
2063            ..Default::default()
2064        });
2065
2066        let files = vec![PathBuf::from("test1.hedl"), PathBuf::from("test2.hedl")];
2067
2068        let results = processor.process(&files, MockOperation { should_fail: false }, false);
2069        assert!(results.is_ok());
2070
2071        let results = results.unwrap();
2072        assert_eq!(results.total_files(), 2);
2073        assert_eq!(results.success_count(), 2);
2074        assert_eq!(results.failure_count(), 0);
2075    }
2076
2077    #[test]
2078    fn test_invalid_thread_count() {
2079        let processor = BatchProcessor::new(BatchConfig {
2080            max_threads: Some(0), // Invalid: zero threads
2081            parallel_threshold: 1,
2082            ..Default::default()
2083        });
2084
2085        let files = vec![PathBuf::from("test.hedl")];
2086        let results = processor.process(&files, MockOperation { should_fail: false }, false);
2087
2088        assert!(results.is_err());
2089        match results {
2090            Err(CliError::ThreadPoolError {
2091                requested_threads, ..
2092            }) => {
2093                assert_eq!(requested_threads, 0);
2094            }
2095            _ => panic!("Expected ThreadPoolError, got: {results:?}"),
2096        }
2097    }
2098
2099    #[test]
2100    fn test_concurrent_batch_operations_different_pools() {
2101        use std::sync::Arc;
2102        use std::thread;
2103
2104        let files = vec![PathBuf::from("test1.hedl"), PathBuf::from("test2.hedl")];
2105
2106        // Run two batch operations concurrently with different thread counts
2107        let processor1 = Arc::new(BatchProcessor::new(BatchConfig {
2108            max_threads: Some(2),
2109            parallel_threshold: 1,
2110            ..Default::default()
2111        }));
2112
2113        let processor2 = Arc::new(BatchProcessor::new(BatchConfig {
2114            max_threads: Some(4),
2115            parallel_threshold: 1,
2116            ..Default::default()
2117        }));
2118
2119        let files1 = files.clone();
2120        let p1 = processor1.clone();
2121        let handle1 =
2122            thread::spawn(move || p1.process(&files1, MockOperation { should_fail: false }, false));
2123
2124        let files2 = files.clone();
2125        let p2 = processor2.clone();
2126        let handle2 =
2127            thread::spawn(move || p2.process(&files2, MockOperation { should_fail: false }, false));
2128
2129        // Both should succeed with their respective configurations
2130        let result1 = handle1.join().unwrap();
2131        let result2 = handle2.join().unwrap();
2132
2133        assert!(result1.is_ok(), "First processor should succeed");
2134        assert!(result2.is_ok(), "Second processor should succeed");
2135
2136        let results1 = result1.unwrap();
2137        let results2 = result2.unwrap();
2138
2139        assert_eq!(results1.total_files(), 2);
2140        assert_eq!(results1.success_count(), 2);
2141        assert_eq!(results2.total_files(), 2);
2142        assert_eq!(results2.success_count(), 2);
2143    }
2144
2145    #[test]
2146    fn test_default_config_uses_global_pool() {
2147        // Verify that default config (no max_threads) doesn't create local pool
2148        let processor = BatchProcessor::default_config();
2149
2150        let files = vec![
2151            PathBuf::from("test1.hedl"),
2152            PathBuf::from("test2.hedl"),
2153            PathBuf::from("test3.hedl"),
2154            PathBuf::from("test4.hedl"),
2155            PathBuf::from("test5.hedl"),
2156            PathBuf::from("test6.hedl"),
2157            PathBuf::from("test7.hedl"),
2158            PathBuf::from("test8.hedl"),
2159            PathBuf::from("test9.hedl"),
2160            PathBuf::from("test10.hedl"),
2161        ];
2162
2163        let results = processor.process(&files, MockOperation { should_fail: false }, false);
2164        assert!(results.is_ok());
2165
2166        let results = results.unwrap();
2167        assert_eq!(results.total_files(), 10);
2168        assert_eq!(results.success_count(), 10);
2169        // This should use global pool, not create a local one
2170    }
2171
2172    #[test]
2173    fn test_local_pool_with_failures() {
2174        // Verify that local thread pool works correctly even when operations fail
2175        let processor = BatchProcessor::new(BatchConfig {
2176            max_threads: Some(3),
2177            parallel_threshold: 1,
2178            ..Default::default()
2179        });
2180
2181        let files = vec![
2182            PathBuf::from("test1.hedl"),
2183            PathBuf::from("test2.hedl"),
2184            PathBuf::from("test3.hedl"),
2185        ];
2186
2187        let results = processor.process(&files, MockOperation { should_fail: true }, false);
2188        assert!(results.is_ok());
2189
2190        let results = results.unwrap();
2191        assert_eq!(results.total_files(), 3);
2192        assert_eq!(results.success_count(), 0);
2193        assert_eq!(results.failure_count(), 3);
2194    }
2195
2196    #[test]
2197    fn test_serial_processing_ignores_max_threads() {
2198        // When file count is below parallel_threshold, max_threads should be ignored
2199        let processor = BatchProcessor::new(BatchConfig {
2200            max_threads: Some(8),
2201            parallel_threshold: 100, // High threshold forces serial
2202            ..Default::default()
2203        });
2204
2205        let files = vec![PathBuf::from("test1.hedl"), PathBuf::from("test2.hedl")];
2206
2207        let results = processor.process(&files, MockOperation { should_fail: false }, false);
2208        assert!(results.is_ok());
2209
2210        let results = results.unwrap();
2211        assert_eq!(results.total_files(), 2);
2212        assert_eq!(results.success_count(), 2);
2213    }
2214
2215    #[test]
2216    fn test_local_pool_single_thread() {
2217        // Test that a local pool with just 1 thread works correctly
2218        let processor = BatchProcessor::new(BatchConfig {
2219            max_threads: Some(1),
2220            parallel_threshold: 1,
2221            ..Default::default()
2222        });
2223
2224        let files = vec![
2225            PathBuf::from("test1.hedl"),
2226            PathBuf::from("test2.hedl"),
2227            PathBuf::from("test3.hedl"),
2228        ];
2229
2230        let results = processor.process(&files, MockOperation { should_fail: false }, false);
2231        assert!(results.is_ok());
2232
2233        let results = results.unwrap();
2234        assert_eq!(results.total_files(), 3);
2235        assert_eq!(results.success_count(), 3);
2236    }
2237
2238    #[test]
2239    fn test_local_pool_many_threads() {
2240        // Test that a local pool with many threads works correctly
2241        let processor = BatchProcessor::new(BatchConfig {
2242            max_threads: Some(16),
2243            parallel_threshold: 1,
2244            ..Default::default()
2245        });
2246
2247        let files: Vec<PathBuf> = (0..32)
2248            .map(|i| PathBuf::from(format!("file{i}.hedl")))
2249            .collect();
2250
2251        let results = processor.process(&files, MockOperation { should_fail: false }, false);
2252        assert!(results.is_ok());
2253
2254        let results = results.unwrap();
2255        assert_eq!(results.total_files(), 32);
2256        assert_eq!(results.success_count(), 32);
2257    }
2258
2259    #[test]
2260    fn test_thread_pool_error_message() {
2261        let processor = BatchProcessor::new(BatchConfig {
2262            max_threads: Some(0),
2263            parallel_threshold: 1,
2264            ..Default::default()
2265        });
2266
2267        let files = vec![PathBuf::from("test.hedl")];
2268        let result = processor.process(&files, MockOperation { should_fail: false }, false);
2269
2270        match result {
2271            Err(CliError::ThreadPoolError {
2272                message,
2273                requested_threads,
2274            }) => {
2275                assert_eq!(requested_threads, 0);
2276                assert!(message.contains("0 threads"), "Message: {message}");
2277            }
2278            _ => panic!("Expected ThreadPoolError"),
2279        }
2280    }
2281}