hedl_cli/batch.rs
1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Batch processing for multiple HEDL files with parallel execution and progress reporting.
19//!
20//! This module provides efficient batch processing capabilities for operations on multiple
21//! HEDL files. It uses Rayon for parallel processing when beneficial and provides real-time
22//! progress reporting with detailed error tracking.
23//!
24//! # Features
25//!
26//! - **Parallel Processing**: Automatic parallelization using Rayon's work-stealing scheduler
27//! - **Progress Reporting**: Real-time progress with file counts and success/failure tracking
28//! - **Error Resilience**: Continues processing on errors, collecting all failures for reporting
29//! - **Performance Optimization**: Intelligent parallel/serial mode selection based on workload
30//! - **Type Safety**: Strongly typed operation definitions with compile-time guarantees
31//!
32//! # Architecture
33//!
34//! The batch processing system uses a functional architecture with:
35//! - Operation trait for extensible batch operations
36//! - Result aggregation with detailed error context
37//! - Atomic counters for thread-safe progress tracking
38//! - Zero-copy file path handling
39//!
40//! # Examples
41//!
42//! ```rust,no_run
43//! use hedl_cli::batch::{BatchProcessor, BatchConfig, ValidationOperation};
44//! use std::path::PathBuf;
45//!
46//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
47//! // Create a batch processor with default configuration
48//! let processor = BatchProcessor::new(BatchConfig::default());
49//!
50//! // Validate multiple files in parallel
51//! let files = vec![
52//! PathBuf::from("file1.hedl"),
53//! PathBuf::from("file2.hedl"),
54//! PathBuf::from("file3.hedl"),
55//! ];
56//!
57//! let operation = ValidationOperation { strict: true };
58//! let results = processor.process(&files, operation, true)?;
59//!
60//! println!("Processed {} files, {} succeeded, {} failed",
61//! results.total_files(),
62//! results.success_count(),
63//! results.failure_count()
64//! );
65//! # Ok(())
66//! # }
67//! ```
68//!
69//! # Performance Characteristics
70//!
71//! - **Small batches (< 10 files)**: Serial processing to avoid overhead
72//! - **Medium batches (10-100 files)**: Parallel with Rayon thread pool
73//! - **Large batches (> 100 files)**: Chunked parallel processing with progress updates
74//!
75//! # Thread Safety
76//!
77//! All progress tracking uses atomic operations for lock-free concurrent access.
78//! Operations are required to be Send + Sync for parallel execution.
79//!
80//! # Thread Pool Management
81//!
82//! The batch processor supports two thread pool strategies:
83//!
84//! ## Global Thread Pool (Default)
85//!
86//! When `max_threads` is `None`, operations use Rayon's global thread pool:
87//! - Zero overhead (no pool creation)
88//! - Shared across all Rayon operations in the process
89//! - Thread count typically matches CPU core count
90//!
91//! ## Local Thread Pool (Isolated)
92//!
93//! When `max_threads` is `Some(n)`, each operation creates an isolated local pool:
94//! - Guaranteed thread count of exactly `n` threads
95//! - No global state pollution
96//! - Supports concurrent operations with different configurations
97//! - Small creation overhead (~0.5-1ms) and memory cost (~2-8MB per thread)
98//!
99//! # Examples
100//!
101//! ```rust,no_run
102//! use hedl_cli::batch::{BatchProcessor, BatchConfig};
103//! use std::path::PathBuf;
104//!
105//! // Concurrent operations with different thread counts
106//! use std::thread;
107//!
108//! let files: Vec<PathBuf> = vec!["a.hedl".into(), "b.hedl".into()];
109//!
110//! let handle1 = thread::spawn(|| {
111//! let processor = BatchProcessor::new(BatchConfig {
112//! max_threads: Some(2),
113//! ..Default::default()
114//! });
115//! // Uses 2 threads
116//! });
117//!
118//! let handle2 = thread::spawn(|| {
119//! let processor = BatchProcessor::new(BatchConfig {
120//! max_threads: Some(4),
121//! ..Default::default()
122//! });
123//! // Uses 4 threads, isolated from handle1
124//! });
125//! ```
126
127use crate::error::CliError;
128use colored::Colorize;
129use rayon::prelude::*;
130use std::collections::HashSet;
131use std::path::{Path, PathBuf};
132use std::sync::atomic::{AtomicUsize, Ordering};
133use std::sync::Arc;
134use std::time::Instant;
135
136/// Configuration for batch processing operations.
137///
138/// Controls parallelization strategy, progress reporting, and error handling behavior.
139///
140/// # Examples
141///
142/// ```rust
143/// use hedl_cli::batch::BatchConfig;
144///
145/// // Default configuration (auto parallelization)
146/// let config = BatchConfig::default();
147///
148/// // Custom configuration
149/// let config = BatchConfig {
150/// parallel_threshold: 5, // Parallelize if >= 5 files
151/// max_threads: Some(4), // Use at most 4 threads
152/// progress_interval: 10, // Update progress every 10 files
153/// verbose: true, // Show detailed progress
154/// max_files: Some(10_000), // Limit to 10,000 files
155/// };
156/// ```
157#[derive(Debug, Clone)]
158pub struct BatchConfig {
159 /// Minimum number of files to trigger parallel processing.
160 ///
161 /// Files below this threshold are processed serially to avoid thread pool overhead.
162 /// Default: 10
163 pub parallel_threshold: usize,
164
165 /// Maximum number of threads to use for parallel processing.
166 ///
167 /// When set, creates a local thread pool isolated to this batch operation.
168 /// This ensures configuration always takes effect and prevents global state pollution.
169 ///
170 /// # Behavior
171 ///
172 /// - `None` (default): Uses Rayon's global thread pool (typically number of CPU cores)
173 /// - `Some(n)`: Creates a local thread pool with exactly `n` threads for this operation
174 ///
175 /// # Thread Pool Isolation
176 ///
177 /// Local thread pools provide complete isolation:
178 /// - No global state modification
179 /// - Concurrent batch operations can use different thread counts
180 /// - Configuration is guaranteed to take effect or error explicitly
181 /// - Thread pool lifetime matches the `process()` call duration
182 ///
183 /// # Performance Considerations
184 ///
185 /// Local thread pool creation has small overhead (~0.5-1ms) and memory cost (~2-8MB per thread).
186 /// For maximum performance with default configuration, leave as `None`.
187 ///
188 /// # Examples
189 ///
190 /// ```rust
191 /// use hedl_cli::batch::BatchConfig;
192 ///
193 /// // Default: uses global pool
194 /// let config = BatchConfig::default();
195 ///
196 /// // Custom: creates local pool with 4 threads
197 /// let config = BatchConfig {
198 /// max_threads: Some(4),
199 /// ..Default::default()
200 /// };
201 /// ```
202 ///
203 /// Default: None
204 pub max_threads: Option<usize>,
205
206 /// Number of files between progress updates.
207 ///
208 /// Progress is printed every N files processed. Set to 0 to disable.
209 /// Default: 1 (update after each file)
210 pub progress_interval: usize,
211
212 /// Enable verbose progress reporting.
213 ///
214 /// When true, shows file names and detailed status for each file.
215 /// Default: false
216 pub verbose: bool,
217
218 /// Maximum number of files allowed in a batch operation.
219 ///
220 /// This prevents resource exhaustion when processing very large file sets.
221 /// - `Some(n)`: Limit to n files (default: 10,000)
222 /// - `None`: No limit (use with caution)
223 ///
224 /// # Security
225 ///
226 /// Protects against:
227 /// - Memory exhaustion from storing millions of file paths
228 /// - File descriptor exhaustion from concurrent operations
229 /// - Excessive CPU time from unbounded processing
230 ///
231 /// # Configuration
232 ///
233 /// Can be overridden via:
234 /// - Environment variable: `HEDL_MAX_BATCH_FILES`
235 /// - CLI flag: `--max-files <N>`
236 /// - Programmatic: `BatchConfig { max_files: Some(n), .. }`
237 ///
238 /// # Examples
239 ///
240 /// ```rust
241 /// use hedl_cli::batch::BatchConfig;
242 ///
243 /// // Default limit (10,000 files)
244 /// let config = BatchConfig::default();
245 ///
246 /// // Custom limit
247 /// let config = BatchConfig {
248 /// max_files: Some(50_000),
249 /// ..Default::default()
250 /// };
251 ///
252 /// // Unlimited (use with caution)
253 /// let config = BatchConfig {
254 /// max_files: None,
255 /// ..Default::default()
256 /// };
257 /// ```
258 pub max_files: Option<usize>,
259}
260
261impl Default for BatchConfig {
262 fn default() -> Self {
263 Self {
264 parallel_threshold: 10,
265 max_threads: None,
266 progress_interval: 1,
267 verbose: false,
268 max_files: Some(get_max_batch_files()),
269 }
270 }
271}
272
273/// Get maximum batch files from environment variable or default.
274///
275/// Checks `HEDL_MAX_BATCH_FILES` environment variable. Falls back to
276/// `DEFAULT_MAX_BATCH_FILES` (10,000) if not set or invalid.
277///
278/// # Examples
279///
280/// ```bash
281/// export HEDL_MAX_BATCH_FILES=50000
282/// hedl batch-validate "*.hedl"
283/// ```
284fn get_max_batch_files() -> usize {
285 const DEFAULT_MAX_BATCH_FILES: usize = 10_000;
286
287 std::env::var("HEDL_MAX_BATCH_FILES")
288 .ok()
289 .and_then(|s| s.parse::<usize>().ok())
290 .unwrap_or(DEFAULT_MAX_BATCH_FILES)
291}
292
293/// Validate file count against configured limit.
294///
295/// # Arguments
296///
297/// * `file_count` - Number of files to process
298/// * `max_files` - Maximum allowed files (None = unlimited)
299///
300/// # Returns
301///
302/// * `Ok(())` - File count is within limit
303/// * `Err(CliError)` - File count exceeds limit
304///
305/// # Examples
306///
307/// ```rust
308/// use hedl_cli::batch::validate_file_count;
309///
310/// // Within limit
311/// assert!(validate_file_count(100, Some(1000)).is_ok());
312///
313/// // Exceeds limit
314/// assert!(validate_file_count(2000, Some(1000)).is_err());
315///
316/// // Unlimited
317/// assert!(validate_file_count(1_000_000, None).is_ok());
318/// ```
319pub fn validate_file_count(file_count: usize, max_files: Option<usize>) -> Result<(), CliError> {
320 if let Some(limit) = max_files {
321 if file_count > limit {
322 return Err(CliError::invalid_input(format!(
323 "File count ({file_count}) exceeds maximum limit ({limit}). \
324 Consider:\n \
325 - Refining glob patterns to match fewer files\n \
326 - Using --max-files flag to increase limit\n \
327 - Setting HEDL_MAX_BATCH_FILES environment variable\n \
328 - Processing files in smaller batches"
329 )));
330 }
331 }
332 Ok(())
333}
334
335/// Warn if file count is large and suggest verbose mode.
336///
337/// Prints a warning when processing many files to inform user of operation scale.
338///
339/// # Arguments
340///
341/// * `file_count` - Number of files to process
342/// * `verbose` - Whether verbose mode is enabled
343///
344/// # Threshold
345///
346/// Warns if `file_count` >= 1000 and not already in verbose mode.
347pub fn warn_large_batch(file_count: usize, verbose: bool) {
348 const WARN_THRESHOLD: usize = 1_000;
349
350 if file_count >= WARN_THRESHOLD && !verbose {
351 eprintln!(
352 "{} Processing {} files. Consider using {} for progress updates.",
353 "Warning:".yellow().bold(),
354 file_count.to_string().bright_white(),
355 "--verbose".bright_cyan()
356 );
357 }
358}
359
360/// Result of processing a single file in a batch operation.
361///
362/// Contains the file path and either a success value or an error.
363///
364/// # Type Parameters
365///
366/// * `T` - The success type returned by the operation
367#[derive(Debug, Clone)]
368pub struct FileResult<T> {
369 /// The file path that was processed
370 pub path: PathBuf,
371 /// The result of processing (Ok or Err)
372 pub result: Result<T, CliError>,
373}
374
375impl<T> FileResult<T> {
376 /// Create a successful file result.
377 pub fn success(path: PathBuf, value: T) -> Self {
378 Self {
379 path,
380 result: Ok(value),
381 }
382 }
383
384 /// Create a failed file result.
385 #[must_use]
386 pub fn failure(path: PathBuf, error: CliError) -> Self {
387 Self {
388 path,
389 result: Err(error),
390 }
391 }
392
393 /// Check if the result is successful.
394 pub fn is_success(&self) -> bool {
395 self.result.is_ok()
396 }
397
398 /// Check if the result is a failure.
399 pub fn is_failure(&self) -> bool {
400 self.result.is_err()
401 }
402}
403
404/// Aggregated results from a batch processing operation.
405///
406/// Contains all individual file results and provides statistics.
407///
408/// # Type Parameters
409///
410/// * `T` - The success type returned by the operation
411#[derive(Debug, Clone)]
412pub struct BatchResults<T> {
413 /// Individual results for each processed file
414 pub results: Vec<FileResult<T>>,
415 /// Total processing time in milliseconds
416 pub elapsed_ms: u128,
417}
418
419impl<T> BatchResults<T> {
420 /// Create new batch results from a vector of file results.
421 #[must_use]
422 pub fn new(results: Vec<FileResult<T>>, elapsed_ms: u128) -> Self {
423 Self {
424 results,
425 elapsed_ms,
426 }
427 }
428
429 /// Get the total number of files processed.
430 #[must_use]
431 pub fn total_files(&self) -> usize {
432 self.results.len()
433 }
434
435 /// Get the number of successfully processed files.
436 #[must_use]
437 pub fn success_count(&self) -> usize {
438 self.results.iter().filter(|r| r.is_success()).count()
439 }
440
441 /// Get the number of failed files.
442 #[must_use]
443 pub fn failure_count(&self) -> usize {
444 self.results.iter().filter(|r| r.is_failure()).count()
445 }
446
447 /// Check if all files were processed successfully.
448 #[must_use]
449 pub fn all_succeeded(&self) -> bool {
450 self.results.iter().all(FileResult::is_success)
451 }
452
453 /// Check if any files failed.
454 #[must_use]
455 pub fn has_failures(&self) -> bool {
456 self.results.iter().any(FileResult::is_failure)
457 }
458
459 /// Get an iterator over successful results.
460 pub fn successes(&self) -> impl Iterator<Item = &FileResult<T>> {
461 self.results.iter().filter(|r| r.is_success())
462 }
463
464 /// Get an iterator over failed results.
465 pub fn failures(&self) -> impl Iterator<Item = &FileResult<T>> {
466 self.results.iter().filter(|r| r.is_failure())
467 }
468
469 /// Get processing throughput in files per second.
470 #[must_use]
471 pub fn throughput(&self) -> f64 {
472 if self.elapsed_ms == 0 {
473 0.0
474 } else {
475 (self.total_files() as f64) / (self.elapsed_ms as f64 / 1000.0)
476 }
477 }
478}
479
480/// Trait for batch operations on HEDL files.
481///
482/// Implement this trait to define custom batch operations. The operation must be
483/// thread-safe (Send + Sync) to support parallel processing.
484///
485/// # Type Parameters
486///
487/// * `Output` - The type returned on successful processing of a file
488///
489/// # Examples
490///
491/// ```rust
492/// use hedl_cli::batch::BatchOperation;
493/// use hedl_cli::error::CliError;
494/// use std::path::Path;
495///
496/// struct CountLinesOperation;
497///
498/// impl BatchOperation for CountLinesOperation {
499/// type Output = usize;
500///
501/// fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
502/// let content = std::fs::read_to_string(path)
503/// .map_err(|e| CliError::io_error(path, e))?;
504/// Ok(content.lines().count())
505/// }
506///
507/// fn name(&self) -> &str {
508/// "count-lines"
509/// }
510/// }
511/// ```
512pub trait BatchOperation: Send + Sync {
513 /// The output type for successful processing
514 type Output: Send;
515
516 /// Process a single file and return the result.
517 ///
518 /// # Arguments
519 ///
520 /// * `path` - The path to the file to process
521 ///
522 /// # Returns
523 ///
524 /// * `Ok(Output)` - On successful processing
525 /// * `Err(CliError)` - On any error
526 ///
527 /// # Errors
528 ///
529 /// Should return appropriate `CliError` variants for different failure modes.
530 fn process_file(&self, path: &Path) -> Result<Self::Output, CliError>;
531
532 /// Get a human-readable name for this operation.
533 ///
534 /// Used for progress reporting and logging.
535 fn name(&self) -> &str;
536}
537
538/// Trait for streaming batch operations on HEDL files.
539///
540/// Unlike `BatchOperation` which loads entire files into memory,
541/// streaming operations process files incrementally with constant memory usage.
542/// This is ideal for processing large files (>100MB) or when memory is constrained.
543///
544/// # Memory Characteristics
545///
546/// - **Standard operations**: `O(num_threads` × `file_size`)
547/// - **Streaming operations**: `O(buffer_size` + `ID_set`) ≈ constant
548///
549/// # Type Parameters
550///
551/// * `Output` - The type returned on successful processing of a file
552///
553/// # Examples
554///
555/// ```rust
556/// use hedl_cli::batch::StreamingBatchOperation;
557/// use hedl_cli::error::CliError;
558/// use std::path::Path;
559///
560/// struct StreamingCountOperation;
561///
562/// impl StreamingBatchOperation for StreamingCountOperation {
563/// type Output = usize;
564///
565/// fn process_file_streaming(&self, path: &Path) -> Result<Self::Output, CliError> {
566/// use std::io::BufReader;
567/// use std::fs::File;
568/// use hedl_stream::StreamingParser;
569///
570/// let file = File::open(path).map_err(|e| CliError::io_error(path, e))?;
571/// let reader = BufReader::new(file);
572/// let parser = StreamingParser::new(reader)
573/// .map_err(|e| CliError::parse(e.to_string()))?;
574///
575/// let count = parser.filter(|e| {
576/// matches!(e, Ok(hedl_stream::NodeEvent::Node(_)))
577/// }).count();
578///
579/// Ok(count)
580/// }
581///
582/// fn name(&self) -> &str {
583/// "count-streaming"
584/// }
585/// }
586/// ```
587pub trait StreamingBatchOperation: Send + Sync {
588 /// The output type for successful processing
589 type Output: Send;
590
591 /// Process a file using streaming parser.
592 ///
593 /// # Arguments
594 ///
595 /// * `path` - File path to process
596 ///
597 /// # Returns
598 ///
599 /// * `Ok(Output)` - On successful processing
600 /// * `Err(CliError)` - On any error
601 ///
602 /// # Memory Guarantee
603 ///
604 /// Implementations should maintain O(1) memory usage regardless of file size,
605 /// processing the file incrementally using the streaming parser.
606 fn process_file_streaming(&self, path: &Path) -> Result<Self::Output, CliError>;
607
608 /// Get operation name for progress reporting
609 fn name(&self) -> &str;
610
611 /// Indicate if this operation can run in streaming mode.
612 ///
613 /// Some operations (like formatting) may require full document.
614 /// Default: true
615 fn supports_streaming(&self) -> bool {
616 true
617 }
618}
619
620/// Progress tracker for batch operations.
621///
622/// Uses atomic counters for lock-free concurrent progress tracking.
623#[derive(Debug)]
624struct ProgressTracker {
625 total: usize,
626 processed: AtomicUsize,
627 succeeded: AtomicUsize,
628 failed: AtomicUsize,
629 interval: usize,
630 verbose: bool,
631 start_time: Instant,
632}
633
634impl ProgressTracker {
635 /// Create a new progress tracker.
636 fn new(total: usize, interval: usize, verbose: bool) -> Self {
637 Self {
638 total,
639 processed: AtomicUsize::new(0),
640 succeeded: AtomicUsize::new(0),
641 failed: AtomicUsize::new(0),
642 interval,
643 verbose,
644 start_time: Instant::now(),
645 }
646 }
647
648 /// Record a successful file processing.
649 fn record_success(&self, path: &Path) {
650 let processed = self.processed.fetch_add(1, Ordering::Relaxed) + 1;
651 self.succeeded.fetch_add(1, Ordering::Relaxed);
652
653 if self.should_report(processed) {
654 self.report_progress(path, true);
655 }
656 }
657
658 /// Record a failed file processing.
659 fn record_failure(&self, path: &Path, error: &CliError) {
660 let processed = self.processed.fetch_add(1, Ordering::Relaxed) + 1;
661 self.failed.fetch_add(1, Ordering::Relaxed);
662
663 if self.verbose {
664 eprintln!("{} {} - {}", "✗".red().bold(), path.display(), error);
665 }
666
667 if self.should_report(processed) {
668 self.report_progress(path, false);
669 }
670 }
671
672 /// Check if progress should be reported for this count.
673 fn should_report(&self, processed: usize) -> bool {
674 self.interval > 0 && (processed % self.interval == 0 || processed == self.total)
675 }
676
677 /// Report current progress to stderr.
678 fn report_progress(&self, current_file: &Path, success: bool) {
679 let processed = self.processed.load(Ordering::Relaxed);
680 let succeeded = self.succeeded.load(Ordering::Relaxed);
681 let failed = self.failed.load(Ordering::Relaxed);
682 let elapsed = self.start_time.elapsed();
683 let rate = processed as f64 / elapsed.as_secs_f64();
684
685 if self.verbose {
686 let status = if success {
687 "✓".green().bold()
688 } else {
689 "✗".red().bold()
690 };
691 eprintln!(
692 "{} [{}/{}] {} ({:.1} files/s)",
693 status,
694 processed,
695 self.total,
696 current_file.display(),
697 rate
698 );
699 } else {
700 eprintln!(
701 "Progress: [{}/{}] {} succeeded, {} failed ({:.1} files/s)",
702 processed, self.total, succeeded, failed, rate
703 );
704 }
705 }
706
707 /// Print final summary.
708 fn print_summary(&self, operation_name: &str) {
709 let processed = self.processed.load(Ordering::Relaxed);
710 let succeeded = self.succeeded.load(Ordering::Relaxed);
711 let failed = self.failed.load(Ordering::Relaxed);
712 let elapsed = self.start_time.elapsed();
713
714 println!();
715 println!("{}", "═".repeat(60).bright_blue());
716 println!(
717 "{} {}",
718 "Batch Operation:".bright_blue().bold(),
719 operation_name.bright_white()
720 );
721 println!("{}", "═".repeat(60).bright_blue());
722 println!(
723 " {} {}",
724 "Total files:".bright_cyan(),
725 processed.to_string().bright_white()
726 );
727 println!(
728 " {} {}",
729 "Succeeded:".green().bold(),
730 succeeded.to_string().bright_white()
731 );
732 println!(
733 " {} {}",
734 "Failed:".red().bold(),
735 failed.to_string().bright_white()
736 );
737 println!(
738 " {} {:.2}s",
739 "Elapsed:".bright_cyan(),
740 elapsed.as_secs_f64()
741 );
742 println!(
743 " {} {:.1} files/s",
744 "Throughput:".bright_cyan(),
745 processed as f64 / elapsed.as_secs_f64()
746 );
747 println!("{}", "═".repeat(60).bright_blue());
748 }
749}
750
751/// High-performance batch processor for HEDL files.
752///
753/// Orchestrates parallel or serial processing based on configuration and workload.
754/// Provides progress tracking and comprehensive error collection.
755///
756/// # Thread Safety
757///
758/// `BatchProcessor` is thread-safe and can be shared across threads via Arc.
759///
760/// # Examples
761///
762/// ```rust,no_run
763/// use hedl_cli::batch::{BatchProcessor, BatchConfig, ValidationOperation};
764/// use std::path::PathBuf;
765///
766/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
767/// let processor = BatchProcessor::new(BatchConfig {
768/// parallel_threshold: 5,
769/// verbose: true,
770/// ..Default::default()
771/// });
772///
773/// let files: Vec<PathBuf> = vec![
774/// "file1.hedl".into(),
775/// "file2.hedl".into(),
776/// ];
777///
778/// let results = processor.process(
779/// &files,
780/// ValidationOperation { strict: false },
781/// true,
782/// )?;
783///
784/// if results.has_failures() {
785/// eprintln!("Some files failed validation");
786/// for failure in results.failures() {
787/// eprintln!(" - {}: {:?}", failure.path.display(), failure.result);
788/// }
789/// }
790/// # Ok(())
791/// # }
792/// ```
793#[derive(Debug, Clone)]
794pub struct BatchProcessor {
795 config: BatchConfig,
796}
797
798impl BatchProcessor {
799 /// Create a new batch processor with the given configuration.
800 #[must_use]
801 pub fn new(config: BatchConfig) -> Self {
802 Self { config }
803 }
804
805 /// Create a batch processor with default configuration.
806 #[must_use]
807 pub fn default_config() -> Self {
808 Self::new(BatchConfig::default())
809 }
810
811 /// Process multiple files with the given operation.
812 ///
813 /// Automatically selects parallel or serial processing based on configuration
814 /// and file count. Provides progress reporting and collects all results.
815 ///
816 /// # Arguments
817 ///
818 /// * `files` - Slice of file paths to process
819 /// * `operation` - The operation to perform on each file
820 /// * `show_progress` - Whether to show progress updates
821 ///
822 /// # Returns
823 ///
824 /// * `Ok(BatchResults)` - Successfully processed all files (individual failures collected in results)
825 /// * `Err(CliError::ThreadPoolError)` - Failed to create thread pool with requested configuration
826 ///
827 /// # Thread Pool Selection
828 ///
829 /// The method uses different thread pool strategies based on configuration:
830 ///
831 /// 1. **Serial Processing**: If `files.len() < parallel_threshold`, processes serially (no thread pool)
832 /// 2. **Local Thread Pool**: If `max_threads` is `Some(n)`, creates isolated pool with `n` threads
833 /// 3. **Global Thread Pool**: If `max_threads` is `None`, uses Rayon's global pool
834 ///
835 /// # Error Handling
836 ///
837 /// Thread pool creation can fail if:
838 /// - `max_threads` is 0 (invalid configuration)
839 /// - System cannot allocate thread resources
840 /// - Thread stack allocation fails
841 ///
842 /// Individual file processing errors are collected in `BatchResults`, not returned as errors.
843 ///
844 /// # Performance
845 ///
846 /// - Serial processing for small batches to avoid thread pool overhead
847 /// - Local thread pool: ~0.5-1ms creation overhead, ~2-8MB per thread
848 /// - Global thread pool: zero overhead
849 /// - Lock-free progress tracking using atomic counters
850 ///
851 /// # Examples
852 ///
853 /// ```rust,no_run
854 /// use hedl_cli::batch::{BatchProcessor, BatchConfig, FormatOperation};
855 /// use hedl_cli::error::CliError;
856 /// use std::path::PathBuf;
857 ///
858 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
859 /// let processor = BatchProcessor::new(BatchConfig {
860 /// max_threads: Some(4),
861 /// ..Default::default()
862 /// });
863 ///
864 /// let files = vec![PathBuf::from("a.hedl"), PathBuf::from("b.hedl")];
865 ///
866 /// match processor.process(
867 /// &files,
868 /// FormatOperation {
869 /// check: false,
870 /// ditto: true,
871 /// with_counts: false,
872 /// },
873 /// true,
874 /// ) {
875 /// Ok(results) => {
876 /// println!("Formatted {} files", results.success_count());
877 /// if results.has_failures() {
878 /// // Handle individual file failures
879 /// }
880 /// }
881 /// Err(CliError::ThreadPoolError { message, requested_threads }) => {
882 /// eprintln!("Failed to create thread pool: {}", message);
883 /// }
884 /// Err(e) => {
885 /// eprintln!("Unexpected error: {}", e);
886 /// }
887 /// }
888 /// # Ok(())
889 /// # }
890 /// ```
891 pub fn process<O>(
892 &self,
893 files: &[PathBuf],
894 operation: O,
895 show_progress: bool,
896 ) -> Result<BatchResults<O::Output>, CliError>
897 where
898 O: BatchOperation,
899 {
900 let start_time = Instant::now();
901
902 if files.is_empty() {
903 // Warn the user when no files match the provided patterns.
904 // Only show warning when progress is enabled, as this indicates
905 // the user expects feedback. Silent mode implies automated/scripted usage.
906 if show_progress {
907 eprintln!(
908 "{} No files matched the provided patterns",
909 "Warning:".yellow().bold()
910 );
911 eprintln!(
912 "{} Check that patterns are correct and files exist",
913 "Hint:".cyan()
914 );
915 }
916 return Ok(BatchResults::new(vec![], 0));
917 }
918
919 let results = if files.len() < self.config.parallel_threshold {
920 // Serial processing for small batches (no thread pool needed)
921 self.process_serial(files, &operation, show_progress)
922 } else if let Some(max_threads) = self.config.max_threads {
923 // Use local thread pool with specified thread count
924 self.process_with_local_pool(files, &operation, show_progress, max_threads)?
925 } else {
926 // Use default global thread pool (Rayon's default)
927 self.process_parallel(files, &operation, show_progress)
928 };
929
930 let elapsed_ms = start_time.elapsed().as_millis();
931
932 Ok(BatchResults::new(results, elapsed_ms))
933 }
934
935 /// Process files using a local thread pool with specified thread count.
936 ///
937 /// Creates an isolated Rayon thread pool that doesn't affect global state.
938 /// The thread pool is created for this operation and destroyed when complete.
939 ///
940 /// # Arguments
941 ///
942 /// * `files` - Slice of file paths to process
943 /// * `operation` - The operation to perform on each file
944 /// * `show_progress` - Whether to show progress updates
945 /// * `num_threads` - The number of threads to use
946 ///
947 /// # Returns
948 ///
949 /// * `Ok(Vec<FileResult>)` - Successfully processed files with local pool
950 /// * `Err(CliError::ThreadPoolError)` - Failed to create thread pool
951 ///
952 /// # Errors
953 ///
954 /// Returns `ThreadPoolError` if:
955 /// - `num_threads` is 0 (invalid configuration)
956 /// - System cannot allocate thread resources
957 /// - Thread stack allocation fails
958 ///
959 /// # Performance
960 ///
961 /// - Thread pool creation: ~0.5-1ms overhead
962 /// - Memory cost: ~2-8MB per thread (OS thread stacks)
963 /// - Pool lifetime: Duration of this method call
964 fn process_with_local_pool<O>(
965 &self,
966 files: &[PathBuf],
967 operation: &O,
968 show_progress: bool,
969 num_threads: usize,
970 ) -> Result<Vec<FileResult<O::Output>>, CliError>
971 where
972 O: BatchOperation,
973 {
974 // Validate thread count - 0 threads is invalid
975 if num_threads == 0 {
976 return Err(CliError::thread_pool_error(
977 "Cannot create thread pool with 0 threads".to_string(),
978 num_threads,
979 ));
980 }
981
982 // Build a local thread pool
983 let pool = rayon::ThreadPoolBuilder::new()
984 .num_threads(num_threads)
985 .build()
986 .map_err(|e| {
987 CliError::thread_pool_error(
988 format!("Failed to create thread pool with {num_threads} threads: {e}"),
989 num_threads,
990 )
991 })?;
992
993 // Run parallel processing within the local pool
994 let results = pool.install(|| self.process_parallel(files, operation, show_progress));
995
996 Ok(results)
997 }
998
999 /// Process files serially (single-threaded).
1000 fn process_serial<O>(
1001 &self,
1002 files: &[PathBuf],
1003 operation: &O,
1004 show_progress: bool,
1005 ) -> Vec<FileResult<O::Output>>
1006 where
1007 O: BatchOperation,
1008 {
1009 let tracker = if show_progress {
1010 Some(ProgressTracker::new(
1011 files.len(),
1012 self.config.progress_interval,
1013 self.config.verbose,
1014 ))
1015 } else {
1016 None
1017 };
1018
1019 let results: Vec<FileResult<O::Output>> = files
1020 .iter()
1021 .map(|path| {
1022 let result = operation.process_file(path);
1023
1024 if let Some(ref t) = tracker {
1025 match &result {
1026 Ok(_) => t.record_success(path),
1027 Err(e) => t.record_failure(path, e),
1028 }
1029 }
1030
1031 FileResult {
1032 path: path.clone(),
1033 result: result.map_err(|e| e.clone()),
1034 }
1035 })
1036 .collect();
1037
1038 if show_progress {
1039 if let Some(tracker) = tracker {
1040 tracker.print_summary(operation.name());
1041 }
1042 }
1043
1044 results
1045 }
1046
1047 /// Process files in parallel using Rayon.
1048 fn process_parallel<O>(
1049 &self,
1050 files: &[PathBuf],
1051 operation: &O,
1052 show_progress: bool,
1053 ) -> Vec<FileResult<O::Output>>
1054 where
1055 O: BatchOperation,
1056 {
1057 let tracker = if show_progress {
1058 Some(Arc::new(ProgressTracker::new(
1059 files.len(),
1060 self.config.progress_interval,
1061 self.config.verbose,
1062 )))
1063 } else {
1064 None
1065 };
1066
1067 let results: Vec<FileResult<O::Output>> = files
1068 .par_iter()
1069 .map(|path| {
1070 let result = operation.process_file(path);
1071
1072 if let Some(ref t) = tracker {
1073 match &result {
1074 Ok(_) => t.record_success(path),
1075 Err(e) => t.record_failure(path, e),
1076 }
1077 }
1078
1079 FileResult {
1080 path: path.clone(),
1081 result: result.map_err(|e| e.clone()),
1082 }
1083 })
1084 .collect();
1085
1086 if show_progress {
1087 if let Some(tracker) = tracker {
1088 tracker.print_summary(operation.name());
1089 }
1090 }
1091
1092 results
1093 }
1094
1095 /// Process files using streaming operations for memory efficiency.
1096 ///
1097 /// This method uses the streaming parser from `hedl-stream` to process files
1098 /// with constant memory usage regardless of file size. Ideal for:
1099 /// - Files larger than 100MB
1100 /// - Memory-constrained environments
1101 /// - Processing thousands of files
1102 ///
1103 /// # Arguments
1104 ///
1105 /// * `files` - Slice of file paths to process
1106 /// * `operation` - The streaming operation to perform
1107 /// * `show_progress` - Whether to show progress updates
1108 ///
1109 /// # Returns
1110 ///
1111 /// * `Ok(BatchResults)` - Always succeeds and collects all individual results
1112 /// * `Err(CliError)` - Only on catastrophic failures
1113 ///
1114 /// # Memory Usage
1115 ///
1116 /// Peak memory = `buffer_size` (8KB) × `num_threads` + ID tracking set
1117 ///
1118 /// # Examples
1119 ///
1120 /// ```rust,no_run
1121 /// use hedl_cli::batch::{BatchProcessor, StreamingValidationOperation, BatchConfig};
1122 /// use std::path::PathBuf;
1123 ///
1124 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1125 /// let processor = BatchProcessor::default_config();
1126 /// let files = vec![PathBuf::from("large-file.hedl")];
1127 /// let operation = StreamingValidationOperation { strict: false };
1128 ///
1129 /// let results = processor.process_streaming(&files, operation, true)?;
1130 /// println!("Processed {} files with constant memory", results.success_count());
1131 /// # Ok(())
1132 /// # }
1133 /// ```
1134 pub fn process_streaming<O>(
1135 &self,
1136 files: &[PathBuf],
1137 operation: O,
1138 show_progress: bool,
1139 ) -> Result<BatchResults<O::Output>, CliError>
1140 where
1141 O: StreamingBatchOperation,
1142 {
1143 let start_time = Instant::now();
1144
1145 if files.is_empty() {
1146 return Ok(BatchResults::new(vec![], 0));
1147 }
1148
1149 // Configure thread pool if max_threads is specified
1150 if let Some(max_threads) = self.config.max_threads {
1151 rayon::ThreadPoolBuilder::new()
1152 .num_threads(max_threads)
1153 .build_global()
1154 .ok(); // Ignore error if already initialized
1155 }
1156
1157 let results = if files.len() < self.config.parallel_threshold {
1158 self.process_streaming_serial(files, &operation, show_progress)
1159 } else {
1160 self.process_streaming_parallel(files, &operation, show_progress)
1161 };
1162
1163 let elapsed_ms = start_time.elapsed().as_millis();
1164 Ok(BatchResults::new(results, elapsed_ms))
1165 }
1166
1167 /// Process files serially using streaming.
1168 fn process_streaming_serial<O>(
1169 &self,
1170 files: &[PathBuf],
1171 operation: &O,
1172 show_progress: bool,
1173 ) -> Vec<FileResult<O::Output>>
1174 where
1175 O: StreamingBatchOperation,
1176 {
1177 let tracker = if show_progress {
1178 Some(ProgressTracker::new(
1179 files.len(),
1180 self.config.progress_interval,
1181 self.config.verbose,
1182 ))
1183 } else {
1184 None
1185 };
1186
1187 let results: Vec<FileResult<O::Output>> = files
1188 .iter()
1189 .map(|path| {
1190 let result = operation.process_file_streaming(path);
1191
1192 if let Some(ref t) = tracker {
1193 match &result {
1194 Ok(_) => t.record_success(path),
1195 Err(e) => t.record_failure(path, e),
1196 }
1197 }
1198
1199 FileResult {
1200 path: path.clone(),
1201 result: result.map_err(|e| e.clone()),
1202 }
1203 })
1204 .collect();
1205
1206 if show_progress {
1207 if let Some(tracker) = tracker {
1208 tracker.print_summary(operation.name());
1209 }
1210 }
1211
1212 results
1213 }
1214
1215 /// Process files in parallel using streaming.
1216 fn process_streaming_parallel<O>(
1217 &self,
1218 files: &[PathBuf],
1219 operation: &O,
1220 show_progress: bool,
1221 ) -> Vec<FileResult<O::Output>>
1222 where
1223 O: StreamingBatchOperation,
1224 {
1225 let tracker = if show_progress {
1226 Some(Arc::new(ProgressTracker::new(
1227 files.len(),
1228 self.config.progress_interval,
1229 self.config.verbose,
1230 )))
1231 } else {
1232 None
1233 };
1234
1235 let results: Vec<FileResult<O::Output>> = files
1236 .par_iter()
1237 .map(|path| {
1238 let result = operation.process_file_streaming(path);
1239
1240 if let Some(ref t) = tracker {
1241 match &result {
1242 Ok(_) => t.record_success(path),
1243 Err(e) => t.record_failure(path, e),
1244 }
1245 }
1246
1247 FileResult {
1248 path: path.clone(),
1249 result: result.map_err(|e| e.clone()),
1250 }
1251 })
1252 .collect();
1253
1254 if show_progress {
1255 if let Some(tracker) = tracker {
1256 tracker.print_summary(operation.name());
1257 }
1258 }
1259
1260 results
1261 }
1262
1263 /// Automatically choose between standard and streaming based on file size.
1264 ///
1265 /// Files larger than 100MB use streaming mode for memory efficiency,
1266 /// while smaller files use standard mode for better performance.
1267 ///
1268 /// # Arguments
1269 ///
1270 /// * `files` - Slice of file paths to process
1271 /// * `standard_op` - Standard operation for small files
1272 /// * `streaming_op` - Streaming operation for large files
1273 /// * `show_progress` - Whether to show progress updates
1274 ///
1275 /// # Returns
1276 ///
1277 /// * `Ok(BatchResults)` - Combined results from both modes
1278 /// * `Err(CliError)` - On catastrophic failures
1279 ///
1280 /// # Examples
1281 ///
1282 /// ```rust,no_run
1283 /// use hedl_cli::batch::{BatchProcessor, ValidationOperation, StreamingValidationOperation};
1284 /// use std::path::PathBuf;
1285 ///
1286 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1287 /// let processor = BatchProcessor::default_config();
1288 /// let files = vec![
1289 /// PathBuf::from("small.hedl"),
1290 /// PathBuf::from("large-200mb.hedl"),
1291 /// ];
1292 ///
1293 /// let results = processor.process_auto(
1294 /// &files,
1295 /// ValidationOperation { strict: false },
1296 /// StreamingValidationOperation { strict: false },
1297 /// true,
1298 /// )?;
1299 /// println!("Processed {} files", results.results.len());
1300 /// # Ok(())
1301 /// # }
1302 /// ```
1303 pub fn process_auto<O, SO>(
1304 &self,
1305 files: &[PathBuf],
1306 standard_op: O,
1307 streaming_op: SO,
1308 show_progress: bool,
1309 ) -> Result<BatchResults<O::Output>, CliError>
1310 where
1311 O: BatchOperation<Output = SO::Output>,
1312 SO: StreamingBatchOperation,
1313 {
1314 const STREAMING_THRESHOLD: u64 = 100 * 1024 * 1024; // 100MB
1315
1316 if files.is_empty() {
1317 return Ok(BatchResults::new(vec![], 0));
1318 }
1319
1320 let start_time = Instant::now();
1321
1322 // Partition files by size
1323 let mut small_files = Vec::new();
1324 let mut large_files = Vec::new();
1325
1326 for path in files {
1327 match std::fs::metadata(path) {
1328 Ok(meta) if meta.len() > STREAMING_THRESHOLD => {
1329 large_files.push(path.clone());
1330 }
1331 Ok(_) => {
1332 small_files.push(path.clone());
1333 }
1334 Err(_) => {
1335 // If we can't get size, treat as small
1336 small_files.push(path.clone());
1337 }
1338 }
1339 }
1340
1341 // Process small files with standard ops
1342 let mut all_results = if small_files.is_empty() {
1343 Vec::new()
1344 } else {
1345 self.process(&small_files, standard_op, show_progress)?
1346 .results
1347 };
1348
1349 // Process large files with streaming ops
1350 if !large_files.is_empty() {
1351 let streaming_results = self
1352 .process_streaming(&large_files, streaming_op, show_progress)?
1353 .results;
1354 all_results.extend(streaming_results);
1355 }
1356
1357 // Restore original order
1358 let file_order: Vec<&PathBuf> = files.iter().collect();
1359 all_results.sort_by_key(|r| {
1360 file_order
1361 .iter()
1362 .position(|&p| p == &r.path)
1363 .unwrap_or(usize::MAX)
1364 });
1365
1366 let elapsed_ms = start_time.elapsed().as_millis();
1367 Ok(BatchResults::new(all_results, elapsed_ms))
1368 }
1369}
1370
1371// ============================================================================
1372// Standard Operations
1373// ============================================================================
1374
1375/// Batch validation operation.
1376///
1377/// Validates multiple HEDL files in parallel, checking syntax and optionally
1378/// enforcing strict reference resolution.
1379#[derive(Debug, Clone)]
1380pub struct ValidationOperation {
1381 /// Enable strict reference validation
1382 pub strict: bool,
1383}
1384
1385impl BatchOperation for ValidationOperation {
1386 type Output = ValidationStats;
1387
1388 fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
1389 use hedl_core::{parse_with_limits, Item, Node, ParseOptions, ReferenceMode};
1390
1391 let content = std::fs::read_to_string(path).map_err(|e| CliError::io_error(path, e))?;
1392
1393 let options = ParseOptions {
1394 reference_mode: if self.strict {
1395 ReferenceMode::Strict
1396 } else {
1397 ReferenceMode::Lenient
1398 },
1399 ..ParseOptions::default()
1400 };
1401
1402 let doc = parse_with_limits(content.as_bytes(), options)
1403 .map_err(|e| CliError::parse(e.to_string()))?;
1404
1405 // Collect statistics from the parsed document
1406 let mut stats = ValidationStats::new();
1407
1408 // Get version from document metadata
1409 stats.version = format!("{}.{}", doc.version.0, doc.version.1);
1410
1411 // Recursive helper to count nodes
1412 fn count_node(node: &Node, stats: &mut ValidationStats) {
1413 stats.node_count += 1;
1414 stats.field_count += node.fields.len();
1415 let full_id = format!("{}:{}", node.type_name, node.id);
1416 stats.seen_ids.insert(full_id);
1417
1418 // Count children recursively
1419 if let Some(ref children) = node.children {
1420 for child_nodes in children.values() {
1421 for child in child_nodes {
1422 count_node(child, stats);
1423 }
1424 }
1425 }
1426 }
1427
1428 // Recursive helper to traverse items
1429 fn traverse_item(item: &Item, stats: &mut ValidationStats) {
1430 match item {
1431 Item::List(list) => {
1432 stats.list_count += 1;
1433 for node in &list.rows {
1434 count_node(node, stats);
1435 }
1436 }
1437 Item::Object(obj) => {
1438 for child_item in obj.values() {
1439 traverse_item(child_item, stats);
1440 }
1441 }
1442 Item::Scalar(_) => {
1443 // Scalars don't contribute to node counts
1444 }
1445 }
1446 }
1447
1448 // Traverse all items in the document root
1449 for item in doc.root.values() {
1450 traverse_item(item, &mut stats);
1451 }
1452
1453 Ok(stats)
1454 }
1455
1456 fn name(&self) -> &'static str {
1457 "validate"
1458 }
1459}
1460
1461/// Batch format operation.
1462///
1463/// Formats multiple HEDL files to canonical form, optionally checking if files
1464/// are already canonical.
1465#[derive(Debug, Clone)]
1466pub struct FormatOperation {
1467 /// Only check if files are canonical (don't write)
1468 pub check: bool,
1469 /// Use ditto optimization
1470 pub ditto: bool,
1471 /// Add count hints to matrix lists
1472 pub with_counts: bool,
1473}
1474
1475impl BatchOperation for FormatOperation {
1476 type Output = String;
1477
1478 fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
1479 use hedl_c14n::{canonicalize_with_config, CanonicalConfig};
1480 use hedl_core::parse;
1481
1482 let content = std::fs::read_to_string(path).map_err(|e| CliError::io_error(path, e))?;
1483
1484 let mut doc = parse(content.as_bytes()).map_err(|e| CliError::parse(e.to_string()))?;
1485
1486 // Add count hints if requested
1487 if self.with_counts {
1488 add_count_hints(&mut doc);
1489 }
1490
1491 let config = CanonicalConfig::new().with_ditto(self.ditto);
1492
1493 let canonical = canonicalize_with_config(&doc, &config)
1494 .map_err(|e| CliError::canonicalization(e.to_string()))?;
1495
1496 if self.check && canonical != content {
1497 return Err(CliError::NotCanonical);
1498 }
1499
1500 Ok(canonical)
1501 }
1502
1503 fn name(&self) -> &str {
1504 if self.check {
1505 "format-check"
1506 } else {
1507 "format"
1508 }
1509 }
1510}
1511
1512/// Batch lint operation.
1513///
1514/// Lints multiple HEDL files for best practices and common issues.
1515#[derive(Debug, Clone)]
1516pub struct LintOperation {
1517 /// Treat warnings as errors
1518 pub warn_error: bool,
1519}
1520
1521impl BatchOperation for LintOperation {
1522 type Output = Vec<String>;
1523
1524 fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
1525 use hedl_core::parse;
1526 use hedl_lint::lint;
1527
1528 let content = std::fs::read_to_string(path).map_err(|e| CliError::io_error(path, e))?;
1529
1530 let doc = parse(content.as_bytes()).map_err(|e| CliError::parse(e.to_string()))?;
1531
1532 let diagnostics = lint(&doc);
1533
1534 if self.warn_error && !diagnostics.is_empty() {
1535 return Err(CliError::LintErrors);
1536 }
1537
1538 Ok(diagnostics
1539 .iter()
1540 .map(std::string::ToString::to_string)
1541 .collect())
1542 }
1543
1544 fn name(&self) -> &'static str {
1545 "lint"
1546 }
1547}
1548
1549// ============================================================================
1550// Streaming Operations
1551// ============================================================================
1552
1553/// Statistics collected during streaming validation.
1554///
1555/// Provides detailed statistics about the parsed document including
1556/// entity counts, field counts, and ID tracking for reference validation.
1557#[derive(Debug, Clone, Default)]
1558pub struct ValidationStats {
1559 /// HEDL version string
1560 pub version: String,
1561 /// Number of lists encountered
1562 pub list_count: usize,
1563 /// Total number of nodes processed
1564 pub node_count: usize,
1565 /// Total number of fields across all nodes
1566 pub field_count: usize,
1567 /// Set of seen IDs for strict reference validation (type:id format)
1568 pub seen_ids: HashSet<String>,
1569}
1570
1571impl ValidationStats {
1572 /// Create new empty validation statistics
1573 #[must_use]
1574 pub fn new() -> Self {
1575 Self::default()
1576 }
1577}
1578
1579/// Streaming validation operation for memory-efficient validation of large files.
1580///
1581/// Uses the streaming parser from `hedl-stream` to validate files with O(1) memory
1582/// usage regardless of file size. Ideal for:
1583/// - Files larger than 100MB
1584/// - Validating thousands of files with limited RAM
1585/// - Container environments with memory limits
1586///
1587/// # Memory Profile
1588///
1589/// - **Input**: O(1) - buffer size only (~8KB)
1590/// - **Working**: `O(n_ids)` - seen ID set for strict validation
1591/// - **Output**: O(1) - small statistics struct
1592/// - **Peak**: ~8KB + ID set size (vs. full file size in standard mode)
1593///
1594/// # Examples
1595///
1596/// ```rust,no_run
1597/// use hedl_cli::batch::{BatchProcessor, StreamingValidationOperation};
1598/// use std::path::PathBuf;
1599///
1600/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1601/// let processor = BatchProcessor::default_config();
1602/// let files = vec![PathBuf::from("large-file.hedl")];
1603///
1604/// let operation = StreamingValidationOperation { strict: false };
1605/// let results = processor.process_streaming(&files, operation, true)?;
1606///
1607/// println!("Validated {} files with constant memory", results.success_count());
1608/// # Ok(())
1609/// # }
1610/// ```
1611#[derive(Debug, Clone)]
1612pub struct StreamingValidationOperation {
1613 /// Enable strict reference validation
1614 pub strict: bool,
1615}
1616
1617impl StreamingBatchOperation for StreamingValidationOperation {
1618 type Output = ValidationStats;
1619
1620 fn process_file_streaming(&self, path: &Path) -> Result<Self::Output, CliError> {
1621 use hedl_stream::{NodeEvent, StreamError, StreamingParser};
1622 use std::fs::File;
1623 use std::io::BufReader;
1624
1625 let file = File::open(path).map_err(|e| CliError::io_error(path, e))?;
1626 let reader = BufReader::with_capacity(8192, file);
1627
1628 let parser = StreamingParser::new(reader)
1629 .map_err(|e: StreamError| CliError::parse(e.to_string()))?;
1630
1631 let mut stats = ValidationStats::new();
1632 let mut _current_type = String::new();
1633
1634 // Process events incrementally
1635 for event in parser {
1636 let event = event.map_err(|e: StreamError| CliError::parse(e.to_string()))?;
1637
1638 match event {
1639 NodeEvent::Header(info) => {
1640 // Validate version exists
1641 let version_str = format!("{}.{}", info.version.0, info.version.1);
1642 if version_str.is_empty() {
1643 return Err(CliError::parse("Missing VERSION".to_string()));
1644 }
1645 stats.version = version_str;
1646 }
1647 NodeEvent::ListStart { type_name, .. } => {
1648 stats.list_count += 1;
1649 _current_type = type_name;
1650 }
1651 NodeEvent::Node(node) => {
1652 stats.node_count += 1;
1653 stats.field_count += node.fields.len();
1654
1655 // Track IDs for strict mode validation
1656 let full_id = format!("{}:{}", node.type_name, node.id);
1657
1658 if self.strict {
1659 // In strict mode, validate references
1660 // For now, just track IDs - full reference validation
1661 // would require accumulating references and validating at end
1662 stats.seen_ids.insert(full_id);
1663 } else {
1664 stats.seen_ids.insert(full_id);
1665 }
1666 }
1667 NodeEvent::ListEnd { .. } => {
1668 // List validation complete
1669 }
1670 NodeEvent::Scalar { .. } => {
1671 // Scalar validation - no action needed
1672 }
1673 NodeEvent::ObjectStart { .. } => {
1674 // Object start - no action needed
1675 }
1676 NodeEvent::ObjectEnd { .. } => {
1677 // Object end - no action needed
1678 }
1679 NodeEvent::EndOfDocument => {
1680 // Document complete
1681 break;
1682 }
1683 }
1684 }
1685
1686 Ok(stats)
1687 }
1688
1689 fn name(&self) -> &'static str {
1690 "validate-streaming"
1691 }
1692
1693 fn supports_streaming(&self) -> bool {
1694 true
1695 }
1696}
1697
1698// ============================================================================
1699// Helper Functions for Count Hints
1700// ============================================================================
1701
1702/// Recursively add count hints to all matrix lists in the document
1703fn add_count_hints(doc: &mut hedl_core::Document) {
1704 for item in doc.root.values_mut() {
1705 add_count_hints_to_item(item);
1706 }
1707}
1708
1709/// Recursively add count hints to an item
1710fn add_count_hints_to_item(item: &mut hedl_core::Item) {
1711 use hedl_core::Item;
1712
1713 match item {
1714 Item::List(list) => {
1715 // Set count hint based on actual row count
1716 list.count_hint = Some(list.rows.len());
1717
1718 // Recursively add child counts to each node
1719 for node in &mut list.rows {
1720 add_child_count_to_node(node);
1721 }
1722 }
1723 Item::Object(map) => {
1724 // Recursively process nested objects
1725 for nested_item in map.values_mut() {
1726 add_count_hints_to_item(nested_item);
1727 }
1728 }
1729 Item::Scalar(_) => {
1730 // Scalars don't have matrix lists
1731 }
1732 }
1733}
1734
1735/// Recursively set `child_count` on nodes that have children
1736fn add_child_count_to_node(node: &mut hedl_core::Node) {
1737 // Calculate total number of direct children across all child types
1738 let total_children: usize = node
1739 .children()
1740 .map_or(0, |c| c.values().map(std::vec::Vec::len).sum());
1741
1742 if total_children > 0 {
1743 node.child_count = total_children.min(u16::MAX as usize) as u16;
1744
1745 // Recursively process all child nodes
1746 if let Some(children) = node.children_mut() {
1747 for child_list in children.values_mut() {
1748 for child_node in child_list {
1749 add_child_count_to_node(child_node);
1750 }
1751 }
1752 }
1753 }
1754}
1755
1756#[cfg(test)]
1757mod tests {
1758 use super::*;
1759 use serial_test::serial;
1760
1761 #[test]
1762 fn test_batch_config_default() {
1763 let config = BatchConfig::default();
1764 assert_eq!(config.parallel_threshold, 10);
1765 assert!(config.max_threads.is_none());
1766 assert_eq!(config.progress_interval, 1);
1767 assert!(!config.verbose);
1768 }
1769
1770 #[test]
1771 fn test_file_result_success() {
1772 let result = FileResult::success(PathBuf::from("test.hedl"), 42);
1773 assert!(result.is_success());
1774 assert!(!result.is_failure());
1775 assert_eq!(result.result.unwrap(), 42);
1776 }
1777
1778 #[test]
1779 fn test_file_result_failure() {
1780 let result: FileResult<()> =
1781 FileResult::failure(PathBuf::from("test.hedl"), CliError::NotCanonical);
1782 assert!(!result.is_success());
1783 assert!(result.is_failure());
1784 assert!(result.result.is_err());
1785 }
1786
1787 #[test]
1788 fn test_batch_results_statistics() {
1789 let results = vec![
1790 FileResult::success(PathBuf::from("a.hedl"), ()),
1791 FileResult::success(PathBuf::from("b.hedl"), ()),
1792 FileResult::failure(PathBuf::from("c.hedl"), CliError::NotCanonical),
1793 ];
1794
1795 let batch = BatchResults::new(results, 1000);
1796
1797 assert_eq!(batch.total_files(), 3);
1798 assert_eq!(batch.success_count(), 2);
1799 assert_eq!(batch.failure_count(), 1);
1800 assert!(!batch.all_succeeded());
1801 assert!(batch.has_failures());
1802 assert_eq!(batch.successes().count(), 2);
1803 assert_eq!(batch.failures().count(), 1);
1804 }
1805
1806 #[test]
1807 fn test_batch_results_throughput() {
1808 let results = vec![
1809 FileResult::success(PathBuf::from("a.hedl"), ()),
1810 FileResult::success(PathBuf::from("b.hedl"), ()),
1811 ];
1812
1813 let batch = BatchResults::new(results, 1000); // 1 second
1814 assert!((batch.throughput() - 2.0).abs() < 0.01);
1815
1816 let batch_zero: BatchResults<()> = BatchResults::new(vec![], 0);
1817 assert_eq!(batch_zero.throughput(), 0.0);
1818 }
1819
1820 #[test]
1821 fn test_progress_tracker_should_report() {
1822 let tracker = ProgressTracker::new(100, 10, false);
1823
1824 assert!(!tracker.should_report(1));
1825 assert!(!tracker.should_report(9));
1826 assert!(tracker.should_report(10)); // Interval boundary
1827 assert!(tracker.should_report(100)); // End
1828 }
1829
1830 // Mock operation for testing
1831 struct MockOperation {
1832 should_fail: bool,
1833 }
1834
1835 impl BatchOperation for MockOperation {
1836 type Output = String;
1837
1838 fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
1839 if self.should_fail {
1840 Err(CliError::NotCanonical)
1841 } else {
1842 Ok(path.to_string_lossy().to_string())
1843 }
1844 }
1845
1846 fn name(&self) -> &'static str {
1847 "mock"
1848 }
1849 }
1850
1851 #[test]
1852 fn test_batch_processor_empty() {
1853 let processor = BatchProcessor::default_config();
1854 let results = processor
1855 .process(&[], MockOperation { should_fail: false }, false)
1856 .unwrap();
1857
1858 assert_eq!(results.total_files(), 0);
1859 assert!(results.all_succeeded());
1860 }
1861
1862 #[test]
1863 fn test_batch_processor_empty_with_progress_shows_warning() {
1864 // This test verifies that empty file list with show_progress=true
1865 // completes successfully (does not panic or return an error).
1866 // The actual warning output goes to stderr and is difficult to capture
1867 // in unit tests, but integration tests verify the output.
1868 let processor = BatchProcessor::default_config();
1869
1870 let results = processor
1871 .process(&[], MockOperation { should_fail: false }, true)
1872 .unwrap();
1873
1874 // Empty batch should succeed (not error)
1875 assert_eq!(results.total_files(), 0);
1876 assert_eq!(results.success_count(), 0);
1877 assert_eq!(results.failure_count(), 0);
1878 assert!(results.all_succeeded());
1879 }
1880
1881 #[test]
1882 fn test_batch_processor_empty_without_progress_silent() {
1883 // Verify that empty file list with show_progress=false succeeds silently
1884 let processor = BatchProcessor::default_config();
1885
1886 let results = processor
1887 .process(&[], MockOperation { should_fail: false }, false)
1888 .unwrap();
1889
1890 assert_eq!(results.total_files(), 0);
1891 assert!(results.all_succeeded());
1892 // No warning should be printed (verified via integration test)
1893 }
1894
1895 #[test]
1896 fn test_empty_batch_returns_ok_not_error() {
1897 // Ensure backward compatibility: empty batch is NOT an error condition
1898 let processor = BatchProcessor::default_config();
1899
1900 let result = processor.process(&[], MockOperation { should_fail: false }, true);
1901
1902 // Empty batch should return Ok, not Err
1903 assert!(result.is_ok());
1904
1905 let results = result.unwrap();
1906 assert_eq!(results.total_files(), 0);
1907 assert_eq!(results.success_count(), 0);
1908 assert_eq!(results.failure_count(), 0);
1909 }
1910
1911 #[test]
1912 fn test_batch_processor_serial_success() {
1913 let processor = BatchProcessor::new(BatchConfig {
1914 parallel_threshold: 100, // Force serial for small batch
1915 ..Default::default()
1916 });
1917
1918 let files = vec![
1919 PathBuf::from("a.hedl"),
1920 PathBuf::from("b.hedl"),
1921 PathBuf::from("c.hedl"),
1922 ];
1923
1924 let results = processor
1925 .process(&files, MockOperation { should_fail: false }, false)
1926 .unwrap();
1927
1928 assert_eq!(results.total_files(), 3);
1929 assert_eq!(results.success_count(), 3);
1930 assert_eq!(results.failure_count(), 0);
1931 assert!(results.all_succeeded());
1932 }
1933
1934 #[test]
1935 fn test_batch_processor_serial_with_failures() {
1936 let processor = BatchProcessor::new(BatchConfig {
1937 parallel_threshold: 100,
1938 ..Default::default()
1939 });
1940
1941 let files = vec![PathBuf::from("a.hedl"), PathBuf::from("b.hedl")];
1942
1943 let results = processor
1944 .process(&files, MockOperation { should_fail: true }, false)
1945 .unwrap();
1946
1947 assert_eq!(results.total_files(), 2);
1948 assert_eq!(results.success_count(), 0);
1949 assert_eq!(results.failure_count(), 2);
1950 assert!(!results.all_succeeded());
1951 assert!(results.has_failures());
1952 }
1953
1954 #[test]
1955 fn test_batch_processor_parallel() {
1956 let processor = BatchProcessor::new(BatchConfig {
1957 parallel_threshold: 2, // Force parallel
1958 ..Default::default()
1959 });
1960
1961 let files: Vec<PathBuf> = (0..20)
1962 .map(|i| PathBuf::from(format!("file{i}.hedl")))
1963 .collect();
1964
1965 let results = processor
1966 .process(&files, MockOperation { should_fail: false }, false)
1967 .unwrap();
1968
1969 assert_eq!(results.total_files(), 20);
1970 assert_eq!(results.success_count(), 20);
1971 }
1972
1973 #[test]
1974 fn test_validate_file_count_within_limit() {
1975 assert!(validate_file_count(100, Some(1000)).is_ok());
1976 }
1977
1978 #[test]
1979 fn test_validate_file_count_at_limit() {
1980 assert!(validate_file_count(1000, Some(1000)).is_ok());
1981 }
1982
1983 #[test]
1984 fn test_validate_file_count_exceeds_limit() {
1985 let result = validate_file_count(2000, Some(1000));
1986 assert!(result.is_err());
1987 let err = result.unwrap_err();
1988 assert!(err.to_string().contains("exceeds maximum limit"));
1989 }
1990
1991 #[test]
1992 fn test_validate_file_count_unlimited() {
1993 // None = unlimited
1994 assert!(validate_file_count(1_000_000, None).is_ok());
1995 }
1996
1997 #[test]
1998 fn test_validate_file_count_zero_files() {
1999 // Zero files always OK regardless of limit
2000 assert!(validate_file_count(0, Some(100)).is_ok());
2001 }
2002
2003 #[test]
2004 #[serial]
2005 fn test_get_max_batch_files_default() {
2006 std::env::remove_var("HEDL_MAX_BATCH_FILES");
2007 let max = get_max_batch_files();
2008 assert_eq!(max, 10_000);
2009 }
2010
2011 #[test]
2012 #[serial]
2013 fn test_get_max_batch_files_env_override() {
2014 std::env::set_var("HEDL_MAX_BATCH_FILES", "50000");
2015 let max = get_max_batch_files();
2016 assert_eq!(max, 50_000);
2017 std::env::remove_var("HEDL_MAX_BATCH_FILES");
2018 }
2019
2020 #[test]
2021 #[serial]
2022 fn test_get_max_batch_files_invalid_env() {
2023 std::env::set_var("HEDL_MAX_BATCH_FILES", "invalid");
2024 let max = get_max_batch_files();
2025 assert_eq!(max, 10_000); // Falls back to default
2026 std::env::remove_var("HEDL_MAX_BATCH_FILES");
2027 }
2028
2029 #[test]
2030 #[serial]
2031 fn test_batch_config_default_has_limit() {
2032 std::env::remove_var("HEDL_MAX_BATCH_FILES");
2033 let config = BatchConfig::default();
2034 assert!(config.max_files.is_some());
2035 assert_eq!(config.max_files.unwrap(), 10_000);
2036 }
2037
2038 #[test]
2039 fn test_warn_large_batch_above_threshold() {
2040 // Note: This test just verifies no panic, can't easily test stderr output
2041 warn_large_batch(5000, false);
2042 }
2043
2044 #[test]
2045 fn test_warn_large_batch_below_threshold() {
2046 warn_large_batch(500, false);
2047 }
2048
2049 #[test]
2050 fn test_warn_large_batch_verbose_suppresses() {
2051 warn_large_batch(5000, true);
2052 }
2053
2054 // ============================================================================
2055 // Thread Pool Tests
2056 // ============================================================================
2057
2058 #[test]
2059 fn test_local_thread_pool_creation() {
2060 let processor = BatchProcessor::new(BatchConfig {
2061 max_threads: Some(2),
2062 parallel_threshold: 1, // Force parallel even with 2 files
2063 ..Default::default()
2064 });
2065
2066 let files = vec![PathBuf::from("test1.hedl"), PathBuf::from("test2.hedl")];
2067
2068 let results = processor.process(&files, MockOperation { should_fail: false }, false);
2069 assert!(results.is_ok());
2070
2071 let results = results.unwrap();
2072 assert_eq!(results.total_files(), 2);
2073 assert_eq!(results.success_count(), 2);
2074 assert_eq!(results.failure_count(), 0);
2075 }
2076
2077 #[test]
2078 fn test_invalid_thread_count() {
2079 let processor = BatchProcessor::new(BatchConfig {
2080 max_threads: Some(0), // Invalid: zero threads
2081 parallel_threshold: 1,
2082 ..Default::default()
2083 });
2084
2085 let files = vec![PathBuf::from("test.hedl")];
2086 let results = processor.process(&files, MockOperation { should_fail: false }, false);
2087
2088 assert!(results.is_err());
2089 match results {
2090 Err(CliError::ThreadPoolError {
2091 requested_threads, ..
2092 }) => {
2093 assert_eq!(requested_threads, 0);
2094 }
2095 _ => panic!("Expected ThreadPoolError, got: {results:?}"),
2096 }
2097 }
2098
2099 #[test]
2100 fn test_concurrent_batch_operations_different_pools() {
2101 use std::sync::Arc;
2102 use std::thread;
2103
2104 let files = vec![PathBuf::from("test1.hedl"), PathBuf::from("test2.hedl")];
2105
2106 // Run two batch operations concurrently with different thread counts
2107 let processor1 = Arc::new(BatchProcessor::new(BatchConfig {
2108 max_threads: Some(2),
2109 parallel_threshold: 1,
2110 ..Default::default()
2111 }));
2112
2113 let processor2 = Arc::new(BatchProcessor::new(BatchConfig {
2114 max_threads: Some(4),
2115 parallel_threshold: 1,
2116 ..Default::default()
2117 }));
2118
2119 let files1 = files.clone();
2120 let p1 = processor1.clone();
2121 let handle1 =
2122 thread::spawn(move || p1.process(&files1, MockOperation { should_fail: false }, false));
2123
2124 let files2 = files.clone();
2125 let p2 = processor2.clone();
2126 let handle2 =
2127 thread::spawn(move || p2.process(&files2, MockOperation { should_fail: false }, false));
2128
2129 // Both should succeed with their respective configurations
2130 let result1 = handle1.join().unwrap();
2131 let result2 = handle2.join().unwrap();
2132
2133 assert!(result1.is_ok(), "First processor should succeed");
2134 assert!(result2.is_ok(), "Second processor should succeed");
2135
2136 let results1 = result1.unwrap();
2137 let results2 = result2.unwrap();
2138
2139 assert_eq!(results1.total_files(), 2);
2140 assert_eq!(results1.success_count(), 2);
2141 assert_eq!(results2.total_files(), 2);
2142 assert_eq!(results2.success_count(), 2);
2143 }
2144
2145 #[test]
2146 fn test_default_config_uses_global_pool() {
2147 // Verify that default config (no max_threads) doesn't create local pool
2148 let processor = BatchProcessor::default_config();
2149
2150 let files = vec![
2151 PathBuf::from("test1.hedl"),
2152 PathBuf::from("test2.hedl"),
2153 PathBuf::from("test3.hedl"),
2154 PathBuf::from("test4.hedl"),
2155 PathBuf::from("test5.hedl"),
2156 PathBuf::from("test6.hedl"),
2157 PathBuf::from("test7.hedl"),
2158 PathBuf::from("test8.hedl"),
2159 PathBuf::from("test9.hedl"),
2160 PathBuf::from("test10.hedl"),
2161 ];
2162
2163 let results = processor.process(&files, MockOperation { should_fail: false }, false);
2164 assert!(results.is_ok());
2165
2166 let results = results.unwrap();
2167 assert_eq!(results.total_files(), 10);
2168 assert_eq!(results.success_count(), 10);
2169 // This should use global pool, not create a local one
2170 }
2171
2172 #[test]
2173 fn test_local_pool_with_failures() {
2174 // Verify that local thread pool works correctly even when operations fail
2175 let processor = BatchProcessor::new(BatchConfig {
2176 max_threads: Some(3),
2177 parallel_threshold: 1,
2178 ..Default::default()
2179 });
2180
2181 let files = vec![
2182 PathBuf::from("test1.hedl"),
2183 PathBuf::from("test2.hedl"),
2184 PathBuf::from("test3.hedl"),
2185 ];
2186
2187 let results = processor.process(&files, MockOperation { should_fail: true }, false);
2188 assert!(results.is_ok());
2189
2190 let results = results.unwrap();
2191 assert_eq!(results.total_files(), 3);
2192 assert_eq!(results.success_count(), 0);
2193 assert_eq!(results.failure_count(), 3);
2194 }
2195
2196 #[test]
2197 fn test_serial_processing_ignores_max_threads() {
2198 // When file count is below parallel_threshold, max_threads should be ignored
2199 let processor = BatchProcessor::new(BatchConfig {
2200 max_threads: Some(8),
2201 parallel_threshold: 100, // High threshold forces serial
2202 ..Default::default()
2203 });
2204
2205 let files = vec![PathBuf::from("test1.hedl"), PathBuf::from("test2.hedl")];
2206
2207 let results = processor.process(&files, MockOperation { should_fail: false }, false);
2208 assert!(results.is_ok());
2209
2210 let results = results.unwrap();
2211 assert_eq!(results.total_files(), 2);
2212 assert_eq!(results.success_count(), 2);
2213 }
2214
2215 #[test]
2216 fn test_local_pool_single_thread() {
2217 // Test that a local pool with just 1 thread works correctly
2218 let processor = BatchProcessor::new(BatchConfig {
2219 max_threads: Some(1),
2220 parallel_threshold: 1,
2221 ..Default::default()
2222 });
2223
2224 let files = vec![
2225 PathBuf::from("test1.hedl"),
2226 PathBuf::from("test2.hedl"),
2227 PathBuf::from("test3.hedl"),
2228 ];
2229
2230 let results = processor.process(&files, MockOperation { should_fail: false }, false);
2231 assert!(results.is_ok());
2232
2233 let results = results.unwrap();
2234 assert_eq!(results.total_files(), 3);
2235 assert_eq!(results.success_count(), 3);
2236 }
2237
2238 #[test]
2239 fn test_local_pool_many_threads() {
2240 // Test that a local pool with many threads works correctly
2241 let processor = BatchProcessor::new(BatchConfig {
2242 max_threads: Some(16),
2243 parallel_threshold: 1,
2244 ..Default::default()
2245 });
2246
2247 let files: Vec<PathBuf> = (0..32)
2248 .map(|i| PathBuf::from(format!("file{i}.hedl")))
2249 .collect();
2250
2251 let results = processor.process(&files, MockOperation { should_fail: false }, false);
2252 assert!(results.is_ok());
2253
2254 let results = results.unwrap();
2255 assert_eq!(results.total_files(), 32);
2256 assert_eq!(results.success_count(), 32);
2257 }
2258
2259 #[test]
2260 fn test_thread_pool_error_message() {
2261 let processor = BatchProcessor::new(BatchConfig {
2262 max_threads: Some(0),
2263 parallel_threshold: 1,
2264 ..Default::default()
2265 });
2266
2267 let files = vec![PathBuf::from("test.hedl")];
2268 let result = processor.process(&files, MockOperation { should_fail: false }, false);
2269
2270 match result {
2271 Err(CliError::ThreadPoolError {
2272 message,
2273 requested_threads,
2274 }) => {
2275 assert_eq!(requested_threads, 0);
2276 assert!(message.contains("0 threads"), "Message: {message}");
2277 }
2278 _ => panic!("Expected ThreadPoolError"),
2279 }
2280 }
2281}