Skip to main content

hedl_cli/batch/
mod.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Batch processing for multiple HEDL files with parallel execution and progress reporting.
19//!
20//! This module provides efficient batch processing capabilities for operations on multiple
21//! HEDL files. It uses Rayon for parallel processing when beneficial and provides real-time
22//! progress reporting with detailed error tracking.
23//!
24//! # Features
25//!
26//! - **Parallel Processing**: Automatic parallelization using Rayon's work-stealing scheduler
27//! - **Progress Reporting**: Real-time progress with file counts and success/failure tracking
28//! - **Error Resilience**: Continues processing on errors, collecting all failures for reporting
29//! - **Performance Optimization**: Intelligent parallel/serial mode selection based on workload
30//! - **Type Safety**: Strongly typed operation definitions with compile-time guarantees
31//!
32//! # Architecture
33//!
34//! The batch processing system uses a functional architecture with:
35//! - Operation trait for extensible batch operations
36//! - Result aggregation with detailed error context
37//! - Atomic counters for thread-safe progress tracking
38//! - Zero-copy file path handling
39//!
40//! # Examples
41//!
42//! ```rust,no_run
43//! use hedl_cli::batch::{BatchExecutor, BatchConfig, ValidationOperation};
44//! use std::path::PathBuf;
45//!
46//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
47//! // Create a batch processor with default configuration
48//! let processor = BatchExecutor::new(BatchConfig::default());
49//!
50//! // Validate multiple files in parallel
51//! let files = vec![
52//!     PathBuf::from("file1.hedl"),
53//!     PathBuf::from("file2.hedl"),
54//!     PathBuf::from("file3.hedl"),
55//! ];
56//!
57//! let operation = ValidationOperation { strict: true };
58//! let results = processor.process(&files, operation, true)?;
59//!
60//! println!("Processed {} files, {} succeeded, {} failed",
61//!     results.total_files(),
62//!     results.success_count(),
63//!     results.failure_count()
64//! );
65//! # Ok(())
66//! # }
67//! ```
68//!
69//! # Performance Characteristics
70//!
71//! - **Small batches (< 10 files)**: Serial processing to avoid overhead
72//! - **Medium batches (10-100 files)**: Parallel with Rayon thread pool
73//! - **Large batches (> 100 files)**: Chunked parallel processing with progress updates
74//!
75//! # Thread Safety
76//!
77//! All progress tracking uses atomic operations for lock-free concurrent access.
78//! Operations are required to be Send + Sync for parallel execution.
79//!
80//! # Thread Pool Management
81//!
82//! The batch processor supports two thread pool strategies:
83//!
84//! ## Global Thread Pool (Default)
85//!
86//! When `max_threads` is `None`, operations use Rayon's global thread pool:
87//! - Zero overhead (no pool creation)
88//! - Shared across all Rayon operations in the process
89//! - Thread count typically matches CPU core count
90//!
91//! ## Local Thread Pool (Isolated)
92//!
93//! When `max_threads` is `Some(n)`, each operation creates an isolated local pool:
94//! - Guaranteed thread count of exactly `n` threads
95//! - No global state pollution
96//! - Supports concurrent operations with different configurations
97//! - Small creation overhead (~0.5-1ms) and memory cost (~2-8MB per thread)
98//!
99//! # Examples
100//!
101//! ```rust,no_run
102//! use hedl_cli::batch::{BatchExecutor, BatchConfig};
103//! use std::path::PathBuf;
104//!
105//! // Concurrent operations with different thread counts
106//! use std::thread;
107//!
108//! let files: Vec<PathBuf> = vec!["a.hedl".into(), "b.hedl".into()];
109//!
110//! let handle1 = thread::spawn(|| {
111//!     let processor = BatchExecutor::new(BatchConfig {
112//!         max_threads: Some(2),
113//!         ..Default::default()
114//!     });
115//!     // Uses 2 threads
116//! });
117//!
118//! let handle2 = thread::spawn(|| {
119//!     let processor = BatchExecutor::new(BatchConfig {
120//!         max_threads: Some(4),
121//!         ..Default::default()
122//!     });
123//!     // Uses 4 threads, isolated from handle1
124//! });
125//! ```
126
127mod config;
128mod executor;
129mod operations;
130mod results;
131mod traits;
132
133// Re-export public API
134pub use config::{get_max_batch_files, validate_file_count, warn_large_batch, BatchConfig};
135pub use executor::BatchExecutor;
136pub use operations::{
137    FormatOperation, LintOperation, StreamingValidationOperation, ValidationOperation,
138    ValidationStats,
139};
140pub use results::{BatchResults, FileResult};
141pub use traits::{BatchOperation, StreamingBatchOperation};
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use crate::error::CliError;
147    use serial_test::serial;
148    use std::path::{Path, PathBuf};
149
150    #[test]
151    fn test_batch_config_default() {
152        let config = BatchConfig::default();
153        assert_eq!(config.parallel_threshold, 10);
154        assert!(config.max_threads.is_none());
155        assert_eq!(config.progress_interval, 1);
156        assert!(!config.verbose);
157    }
158
159    #[test]
160    fn test_file_result_success() {
161        let result = FileResult::success(PathBuf::from("test.hedl"), 42);
162        assert!(result.is_success());
163        assert!(!result.is_failure());
164        assert_eq!(result.result.unwrap(), 42);
165    }
166
167    #[test]
168    fn test_file_result_failure() {
169        let result: FileResult<()> =
170            FileResult::failure(PathBuf::from("test.hedl"), CliError::NotCanonical);
171        assert!(!result.is_success());
172        assert!(result.is_failure());
173        assert!(result.result.is_err());
174    }
175
176    #[test]
177    fn test_batch_results_statistics() {
178        let results = vec![
179            FileResult::success(PathBuf::from("a.hedl"), ()),
180            FileResult::success(PathBuf::from("b.hedl"), ()),
181            FileResult::failure(PathBuf::from("c.hedl"), CliError::NotCanonical),
182        ];
183
184        let batch = BatchResults::new(results, 1000);
185
186        assert_eq!(batch.total_files(), 3);
187        assert_eq!(batch.success_count(), 2);
188        assert_eq!(batch.failure_count(), 1);
189        assert!(!batch.all_succeeded());
190        assert!(batch.has_failures());
191        assert_eq!(batch.successes().count(), 2);
192        assert_eq!(batch.failures().count(), 1);
193    }
194
195    #[test]
196    fn test_batch_results_throughput() {
197        let results = vec![
198            FileResult::success(PathBuf::from("a.hedl"), ()),
199            FileResult::success(PathBuf::from("b.hedl"), ()),
200        ];
201
202        let batch = BatchResults::new(results, 1000); // 1 second
203        assert!((batch.throughput() - 2.0).abs() < 0.01);
204
205        let batch_zero: BatchResults<()> = BatchResults::new(vec![], 0);
206        assert_eq!(batch_zero.throughput(), 0.0);
207    }
208
209    // Mock operation for testing
210    struct MockOperation {
211        should_fail: bool,
212    }
213
214    impl BatchOperation for MockOperation {
215        type Output = String;
216
217        fn process_file(&self, path: &Path) -> Result<Self::Output, CliError> {
218            if self.should_fail {
219                Err(CliError::NotCanonical)
220            } else {
221                Ok(path.to_string_lossy().to_string())
222            }
223        }
224
225        fn name(&self) -> &'static str {
226            "mock"
227        }
228    }
229
230    #[test]
231    fn test_batch_processor_empty() {
232        let processor = BatchExecutor::default_config();
233        let results = processor
234            .process(&[], MockOperation { should_fail: false }, false)
235            .unwrap();
236
237        assert_eq!(results.total_files(), 0);
238        assert!(results.all_succeeded());
239    }
240
241    #[test]
242    fn test_batch_processor_empty_with_progress_shows_warning() {
243        // This test verifies that empty file list with show_progress=true
244        // completes successfully (does not panic or return an error).
245        // The actual warning output goes to stderr and is difficult to capture
246        // in unit tests, but integration tests verify the output.
247        let processor = BatchExecutor::default_config();
248
249        let results = processor
250            .process(&[], MockOperation { should_fail: false }, true)
251            .unwrap();
252
253        // Empty batch should succeed (not error)
254        assert_eq!(results.total_files(), 0);
255        assert_eq!(results.success_count(), 0);
256        assert_eq!(results.failure_count(), 0);
257        assert!(results.all_succeeded());
258    }
259
260    #[test]
261    fn test_batch_processor_empty_without_progress_silent() {
262        // Verify that empty file list with show_progress=false succeeds silently
263        let processor = BatchExecutor::default_config();
264
265        let results = processor
266            .process(&[], MockOperation { should_fail: false }, false)
267            .unwrap();
268
269        assert_eq!(results.total_files(), 0);
270        assert!(results.all_succeeded());
271        // No warning should be printed (verified via integration test)
272    }
273
274    #[test]
275    fn test_empty_batch_returns_ok_not_error() {
276        // Ensure backward compatibility: empty batch is NOT an error condition
277        let processor = BatchExecutor::default_config();
278
279        let result = processor.process(&[], MockOperation { should_fail: false }, true);
280
281        // Empty batch should return Ok, not Err
282        assert!(result.is_ok());
283
284        let results = result.unwrap();
285        assert_eq!(results.total_files(), 0);
286        assert_eq!(results.success_count(), 0);
287        assert_eq!(results.failure_count(), 0);
288    }
289
290    #[test]
291    fn test_batch_processor_serial_success() {
292        let processor = BatchExecutor::new(BatchConfig {
293            parallel_threshold: 100, // Force serial for small batch
294            ..Default::default()
295        });
296
297        let files = vec![
298            PathBuf::from("a.hedl"),
299            PathBuf::from("b.hedl"),
300            PathBuf::from("c.hedl"),
301        ];
302
303        let results = processor
304            .process(&files, MockOperation { should_fail: false }, false)
305            .unwrap();
306
307        assert_eq!(results.total_files(), 3);
308        assert_eq!(results.success_count(), 3);
309        assert_eq!(results.failure_count(), 0);
310        assert!(results.all_succeeded());
311    }
312
313    #[test]
314    fn test_batch_processor_serial_with_failures() {
315        let processor = BatchExecutor::new(BatchConfig {
316            parallel_threshold: 100,
317            ..Default::default()
318        });
319
320        let files = vec![PathBuf::from("a.hedl"), PathBuf::from("b.hedl")];
321
322        let results = processor
323            .process(&files, MockOperation { should_fail: true }, false)
324            .unwrap();
325
326        assert_eq!(results.total_files(), 2);
327        assert_eq!(results.success_count(), 0);
328        assert_eq!(results.failure_count(), 2);
329        assert!(!results.all_succeeded());
330        assert!(results.has_failures());
331    }
332
333    #[test]
334    fn test_batch_processor_parallel() {
335        let processor = BatchExecutor::new(BatchConfig {
336            parallel_threshold: 2, // Force parallel
337            ..Default::default()
338        });
339
340        let files: Vec<PathBuf> = (0..20)
341            .map(|i| PathBuf::from(format!("file{i}.hedl")))
342            .collect();
343
344        let results = processor
345            .process(&files, MockOperation { should_fail: false }, false)
346            .unwrap();
347
348        assert_eq!(results.total_files(), 20);
349        assert_eq!(results.success_count(), 20);
350    }
351
352    #[test]
353    fn test_validate_file_count_within_limit() {
354        assert!(validate_file_count(100, Some(1000)).is_ok());
355    }
356
357    #[test]
358    fn test_validate_file_count_at_limit() {
359        assert!(validate_file_count(1000, Some(1000)).is_ok());
360    }
361
362    #[test]
363    fn test_validate_file_count_exceeds_limit() {
364        let result = validate_file_count(2000, Some(1000));
365        assert!(result.is_err());
366        let err = result.unwrap_err();
367        assert!(err.to_string().contains("exceeds maximum limit"));
368    }
369
370    #[test]
371    fn test_validate_file_count_unlimited() {
372        // None = unlimited
373        assert!(validate_file_count(1_000_000, None).is_ok());
374    }
375
376    #[test]
377    fn test_validate_file_count_zero_files() {
378        // Zero files always OK regardless of limit
379        assert!(validate_file_count(0, Some(100)).is_ok());
380    }
381
382    #[test]
383    #[serial]
384    fn test_get_max_batch_files_default() {
385        std::env::remove_var("HEDL_MAX_BATCH_FILES");
386        let max = get_max_batch_files();
387        assert_eq!(max, 10_000);
388    }
389
390    #[test]
391    #[serial]
392    fn test_get_max_batch_files_env_override() {
393        std::env::set_var("HEDL_MAX_BATCH_FILES", "50000");
394        let max = get_max_batch_files();
395        assert_eq!(max, 50_000);
396        std::env::remove_var("HEDL_MAX_BATCH_FILES");
397    }
398
399    #[test]
400    #[serial]
401    fn test_get_max_batch_files_invalid_env() {
402        std::env::set_var("HEDL_MAX_BATCH_FILES", "invalid");
403        let max = get_max_batch_files();
404        assert_eq!(max, 10_000); // Falls back to default
405        std::env::remove_var("HEDL_MAX_BATCH_FILES");
406    }
407
408    #[test]
409    #[serial]
410    fn test_batch_config_default_has_limit() {
411        std::env::remove_var("HEDL_MAX_BATCH_FILES");
412        let config = BatchConfig::default();
413        assert!(config.max_files.is_some());
414        assert_eq!(config.max_files.unwrap(), 10_000);
415    }
416
417    #[test]
418    fn test_warn_large_batch_above_threshold() {
419        // Note: This test just verifies no panic, can't easily test stderr output
420        warn_large_batch(5000, false);
421    }
422
423    #[test]
424    fn test_warn_large_batch_below_threshold() {
425        warn_large_batch(500, false);
426    }
427
428    #[test]
429    fn test_warn_large_batch_verbose_suppresses() {
430        warn_large_batch(5000, true);
431    }
432
433    // ============================================================================
434    // Thread Pool Tests
435    // ============================================================================
436
437    #[test]
438    fn test_local_thread_pool_creation() {
439        let processor = BatchExecutor::new(BatchConfig {
440            max_threads: Some(2),
441            parallel_threshold: 1, // Force parallel even with 2 files
442            ..Default::default()
443        });
444
445        let files = vec![PathBuf::from("test1.hedl"), PathBuf::from("test2.hedl")];
446
447        let results = processor.process(&files, MockOperation { should_fail: false }, false);
448        assert!(results.is_ok());
449
450        let results = results.unwrap();
451        assert_eq!(results.total_files(), 2);
452        assert_eq!(results.success_count(), 2);
453        assert_eq!(results.failure_count(), 0);
454    }
455
456    #[test]
457    fn test_invalid_thread_count() {
458        let processor = BatchExecutor::new(BatchConfig {
459            max_threads: Some(0), // Invalid: zero threads
460            parallel_threshold: 1,
461            ..Default::default()
462        });
463
464        let files = vec![PathBuf::from("test.hedl")];
465        let results = processor.process(&files, MockOperation { should_fail: false }, false);
466
467        assert!(results.is_err());
468        match results {
469            Err(CliError::ThreadPoolError {
470                requested_threads, ..
471            }) => {
472                assert_eq!(requested_threads, 0);
473            }
474            _ => panic!("Expected ThreadPoolError, got: {results:?}"),
475        }
476    }
477
478    #[test]
479    fn test_concurrent_batch_operations_different_pools() {
480        use std::sync::Arc;
481        use std::thread;
482
483        let files = vec![PathBuf::from("test1.hedl"), PathBuf::from("test2.hedl")];
484
485        // Run two batch operations concurrently with different thread counts
486        let processor1 = Arc::new(BatchExecutor::new(BatchConfig {
487            max_threads: Some(2),
488            parallel_threshold: 1,
489            ..Default::default()
490        }));
491
492        let processor2 = Arc::new(BatchExecutor::new(BatchConfig {
493            max_threads: Some(4),
494            parallel_threshold: 1,
495            ..Default::default()
496        }));
497
498        let files1 = files.clone();
499        let p1 = processor1.clone();
500        let handle1 =
501            thread::spawn(move || p1.process(&files1, MockOperation { should_fail: false }, false));
502
503        let files2 = files.clone();
504        let p2 = processor2.clone();
505        let handle2 =
506            thread::spawn(move || p2.process(&files2, MockOperation { should_fail: false }, false));
507
508        // Both should succeed with their respective configurations
509        let result1 = handle1.join().unwrap();
510        let result2 = handle2.join().unwrap();
511
512        assert!(result1.is_ok(), "First processor should succeed");
513        assert!(result2.is_ok(), "Second processor should succeed");
514
515        let results1 = result1.unwrap();
516        let results2 = result2.unwrap();
517
518        assert_eq!(results1.total_files(), 2);
519        assert_eq!(results1.success_count(), 2);
520        assert_eq!(results2.total_files(), 2);
521        assert_eq!(results2.success_count(), 2);
522    }
523
524    #[test]
525    fn test_default_config_uses_global_pool() {
526        // Verify that default config (no max_threads) doesn't create local pool
527        let processor = BatchExecutor::default_config();
528
529        let files = vec![
530            PathBuf::from("test1.hedl"),
531            PathBuf::from("test2.hedl"),
532            PathBuf::from("test3.hedl"),
533            PathBuf::from("test4.hedl"),
534            PathBuf::from("test5.hedl"),
535            PathBuf::from("test6.hedl"),
536            PathBuf::from("test7.hedl"),
537            PathBuf::from("test8.hedl"),
538            PathBuf::from("test9.hedl"),
539            PathBuf::from("test10.hedl"),
540        ];
541
542        let results = processor.process(&files, MockOperation { should_fail: false }, false);
543        assert!(results.is_ok());
544
545        let results = results.unwrap();
546        assert_eq!(results.total_files(), 10);
547        assert_eq!(results.success_count(), 10);
548        // This should use global pool, not create a local one
549    }
550
551    #[test]
552    fn test_local_pool_with_failures() {
553        // Verify that local thread pool works correctly even when operations fail
554        let processor = BatchExecutor::new(BatchConfig {
555            max_threads: Some(3),
556            parallel_threshold: 1,
557            ..Default::default()
558        });
559
560        let files = vec![
561            PathBuf::from("test1.hedl"),
562            PathBuf::from("test2.hedl"),
563            PathBuf::from("test3.hedl"),
564        ];
565
566        let results = processor.process(&files, MockOperation { should_fail: true }, false);
567        assert!(results.is_ok());
568
569        let results = results.unwrap();
570        assert_eq!(results.total_files(), 3);
571        assert_eq!(results.success_count(), 0);
572        assert_eq!(results.failure_count(), 3);
573    }
574
575    #[test]
576    fn test_serial_processing_ignores_max_threads() {
577        // When file count is below parallel_threshold, max_threads should be ignored
578        let processor = BatchExecutor::new(BatchConfig {
579            max_threads: Some(8),
580            parallel_threshold: 100, // High threshold forces serial
581            ..Default::default()
582        });
583
584        let files = vec![PathBuf::from("test1.hedl"), PathBuf::from("test2.hedl")];
585
586        let results = processor.process(&files, MockOperation { should_fail: false }, false);
587        assert!(results.is_ok());
588
589        let results = results.unwrap();
590        assert_eq!(results.total_files(), 2);
591        assert_eq!(results.success_count(), 2);
592    }
593
594    #[test]
595    fn test_local_pool_single_thread() {
596        // Test that a local pool with just 1 thread works correctly
597        let processor = BatchExecutor::new(BatchConfig {
598            max_threads: Some(1),
599            parallel_threshold: 1,
600            ..Default::default()
601        });
602
603        let files = vec![
604            PathBuf::from("test1.hedl"),
605            PathBuf::from("test2.hedl"),
606            PathBuf::from("test3.hedl"),
607        ];
608
609        let results = processor.process(&files, MockOperation { should_fail: false }, false);
610        assert!(results.is_ok());
611
612        let results = results.unwrap();
613        assert_eq!(results.total_files(), 3);
614        assert_eq!(results.success_count(), 3);
615    }
616
617    #[test]
618    fn test_local_pool_many_threads() {
619        // Test that a local pool with many threads works correctly
620        let processor = BatchExecutor::new(BatchConfig {
621            max_threads: Some(16),
622            parallel_threshold: 1,
623            ..Default::default()
624        });
625
626        let files: Vec<PathBuf> = (0..32)
627            .map(|i| PathBuf::from(format!("file{i}.hedl")))
628            .collect();
629
630        let results = processor.process(&files, MockOperation { should_fail: false }, false);
631        assert!(results.is_ok());
632
633        let results = results.unwrap();
634        assert_eq!(results.total_files(), 32);
635        assert_eq!(results.success_count(), 32);
636    }
637
638    #[test]
639    fn test_thread_pool_error_message() {
640        let processor = BatchExecutor::new(BatchConfig {
641            max_threads: Some(0),
642            parallel_threshold: 1,
643            ..Default::default()
644        });
645
646        let files = vec![PathBuf::from("test.hedl")];
647        let result = processor.process(&files, MockOperation { should_fail: false }, false);
648
649        match result {
650            Err(CliError::ThreadPoolError {
651                message,
652                requested_threads,
653            }) => {
654                assert_eq!(requested_threads, 0);
655                assert!(message.contains("0 threads"), "Message: {message}");
656            }
657            _ => panic!("Expected ThreadPoolError"),
658        }
659    }
660}