fast_yaml_cli/batch/
processor.rs

1//! Parallel file processor for batch YAML formatting.
2
3use std::path::Path;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::time::Instant;
6
7use fast_yaml_core::emitter::{Emitter, EmitterConfig};
8use rayon::prelude::*;
9
10use super::config::ProcessingConfig;
11use super::discovery::DiscoveredFile;
12use super::error::ProcessingError;
13use super::reader::SmartFileReader;
14use super::result::{BatchResult, FileOutcome, FileResult};
15
16/// Parallel file processor for batch YAML formatting.
17///
18/// Processes multiple YAML files in parallel using Rayon's work-stealing scheduler.
19/// Automatically chooses optimal reading strategy based on file size (in-memory vs mmap).
20pub struct BatchProcessor {
21    config: ProcessingConfig,
22    emitter_config: EmitterConfig,
23    reader: SmartFileReader,
24}
25
26impl BatchProcessor {
27    /// Creates a new `BatchProcessor` with the given configuration
28    pub fn new(config: ProcessingConfig) -> Self {
29        let emitter_config = EmitterConfig::new()
30            .with_indent(config.indent as usize)
31            .with_width(config.width);
32
33        let reader = SmartFileReader::with_threshold(config.mmap_threshold as u64);
34
35        Self {
36            config,
37            emitter_config,
38            reader,
39        }
40    }
41
42    /// Processes a batch of discovered files in parallel.
43    ///
44    /// Returns aggregated results with success/failure counts and error details.
45    /// Continues processing all files even if some fail.
46    pub fn process(&self, files: &[DiscoveredFile]) -> BatchResult {
47        let batch_start = Instant::now();
48        let total = files.len();
49
50        if total == 0 {
51            return BatchResult::new();
52        }
53
54        let results = if self.should_use_custom_pool() {
55            self.process_with_custom_pool(files)
56        } else {
57            self.process_with_default_pool(files)
58        };
59
60        let mut batch = BatchResult::from_results(results);
61        batch.duration = batch_start.elapsed();
62        batch
63    }
64
65    /// Processes files using a custom thread pool with configured worker count
66    fn process_with_custom_pool(&self, files: &[DiscoveredFile]) -> Vec<FileResult> {
67        rayon::ThreadPoolBuilder::new()
68            .num_threads(self.config.effective_workers())
69            .build()
70            .map_or_else(
71                |_| {
72                    // Fallback to default pool if custom pool creation fails
73                    self.process_files_parallel(files)
74                },
75                |pool| pool.install(|| self.process_files_parallel(files)),
76            )
77    }
78
79    /// Processes files using the default Rayon thread pool.
80    ///
81    /// For very small batches (<10 files), uses sequential processing
82    /// to avoid Rayon thread pool overhead.
83    fn process_with_default_pool(&self, files: &[DiscoveredFile]) -> Vec<FileResult> {
84        // Sequential path for tiny batches - avoids Rayon overhead
85        if files.len() < 10 {
86            self.process_files_sequential(files)
87        } else {
88            self.process_files_parallel(files)
89        }
90    }
91
92    /// Processes files in parallel using Rayon's `par_iter`
93    fn process_files_parallel(&self, files: &[DiscoveredFile]) -> Vec<FileResult> {
94        let processed = AtomicUsize::new(0);
95        let total = files.len();
96
97        files
98            .par_iter()
99            .map(|file| {
100                let result = self.process_single_file(&file.path);
101
102                if self.config.verbose {
103                    let n = processed.fetch_add(1, Ordering::Relaxed) + 1;
104                    // Pre-format string to reduce time holding stderr lock
105                    let msg = format!("[{n}/{total}] {}", file.path.display());
106                    eprintln!("{msg}");
107                }
108
109                result
110            })
111            .collect()
112    }
113
114    /// Processes files sequentially without parallel overhead.
115    ///
116    /// Used for small batches (<10 files) where Rayon thread pool overhead
117    /// would exceed the benefit of parallelism.
118    fn process_files_sequential(&self, files: &[DiscoveredFile]) -> Vec<FileResult> {
119        let total = files.len();
120
121        files
122            .iter()
123            .enumerate()
124            .map(|(i, file)| {
125                let result = self.process_single_file(&file.path);
126
127                if self.config.verbose {
128                    eprintln!("[{}/{}] {}", i + 1, total, file.path.display());
129                }
130
131                result
132            })
133            .collect()
134    }
135
136    /// Processes a single file and returns the result
137    fn process_single_file(&self, path: &Path) -> FileResult {
138        let start = Instant::now();
139
140        match self.format_file(path) {
141            Ok(changed) => {
142                let duration = start.elapsed();
143                let outcome = if self.config.dry_run && changed {
144                    FileOutcome::Skipped { duration }
145                } else if changed {
146                    FileOutcome::Formatted {
147                        changed: true,
148                        duration,
149                    }
150                } else {
151                    FileOutcome::Unchanged { duration }
152                };
153                FileResult::new(path.to_path_buf(), outcome)
154            }
155            Err(error) => FileResult::new(
156                path.to_path_buf(),
157                FileOutcome::Failed {
158                    error,
159                    duration: start.elapsed(),
160                },
161            ),
162        }
163    }
164
165    /// Formats a single file and returns whether it changed.
166    ///
167    /// Reads the file, formats it, compares with original, and writes if changed
168    /// (unless in dry-run mode).
169    fn format_file(&self, path: &Path) -> Result<bool, ProcessingError> {
170        let file_content = self.reader.read(path)?;
171        let original = file_content.as_str()?;
172
173        let formatted = Emitter::format_with_config(original, &self.emitter_config)
174            .map_err(|e| ProcessingError::FormatError(format!("{}: {}", path.display(), e)))?;
175
176        let changed = original != formatted;
177
178        if changed && self.config.in_place && !self.config.dry_run {
179            Self::write_file_atomic(path, &formatted)?;
180        }
181
182        Ok(changed)
183    }
184
185    /// Writes content to file atomically using temp file + rename.
186    ///
187    /// This ensures that concurrent reads will either see the old content
188    /// or the new content, never a partial write.
189    fn write_file_atomic(path: &Path, content: &str) -> Result<(), ProcessingError> {
190        let temp_path = path.with_extension("tmp");
191
192        std::fs::write(&temp_path, content).map_err(ProcessingError::WriteError)?;
193
194        std::fs::rename(&temp_path, path).map_err(ProcessingError::WriteError)?;
195
196        Ok(())
197    }
198
199    /// Returns true if a custom thread pool should be used
200    const fn should_use_custom_pool(&self) -> bool {
201        self.config.workers != 0
202    }
203}
204
205/// Convenience function for batch processing with default configuration
206pub fn process_batch(files: &[DiscoveredFile], config: ProcessingConfig) -> BatchResult {
207    let processor = BatchProcessor::new(config);
208    processor.process(files)
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use std::fs;
215    use tempfile::TempDir;
216
217    fn create_test_file(dir: &TempDir, name: &str, content: &str) -> DiscoveredFile {
218        let path = dir.path().join(name);
219        fs::write(&path, content).unwrap();
220        DiscoveredFile {
221            path,
222            origin: super::super::discovery::DiscoveryOrigin::DirectPath,
223        }
224    }
225
226    #[test]
227    fn test_process_single_file_success() {
228        let dir = TempDir::new().unwrap();
229        let file = create_test_file(&dir, "test.yaml", "key: value\n");
230
231        let config = ProcessingConfig::new();
232        let processor = BatchProcessor::new(config);
233
234        let result = processor.process_single_file(&file.path);
235        assert!(result.is_success());
236    }
237
238    #[test]
239    fn test_process_single_file_no_write_when_not_in_place() {
240        let dir = TempDir::new().unwrap();
241        let file = create_test_file(&dir, "test.yaml", "key: value\n");
242
243        let config = ProcessingConfig::new().with_in_place(false);
244        let processor = BatchProcessor::new(config);
245
246        let original_content = fs::read_to_string(&file.path).unwrap();
247        let result = processor.process_single_file(&file.path);
248
249        // File should not be modified when in_place is false
250        let after_content = fs::read_to_string(&file.path).unwrap();
251        assert_eq!(original_content, after_content);
252        assert!(result.is_success());
253    }
254
255    #[test]
256    fn test_process_single_file_dry_run() {
257        let dir = TempDir::new().unwrap();
258        let file = create_test_file(&dir, "test.yaml", "key:value\n");
259
260        let config = ProcessingConfig::new()
261            .with_dry_run(true)
262            .with_in_place(true);
263        let processor = BatchProcessor::new(config);
264
265        let original_content = fs::read_to_string(&file.path).unwrap();
266        let _ = processor.process_single_file(&file.path);
267
268        let after_content = fs::read_to_string(&file.path).unwrap();
269        assert_eq!(original_content, after_content);
270    }
271
272    #[test]
273    fn test_process_single_file_in_place() {
274        let dir = TempDir::new().unwrap();
275        let file = create_test_file(&dir, "test.yaml", "key:  value\n");
276
277        let config = ProcessingConfig::new().with_in_place(true);
278        let processor = BatchProcessor::new(config);
279
280        let original_content = fs::read_to_string(&file.path).unwrap();
281        let result = processor.process_single_file(&file.path);
282
283        let after_content = fs::read_to_string(&file.path).unwrap();
284
285        if result.outcome.was_changed() {
286            assert_ne!(original_content, after_content);
287        }
288    }
289
290    #[test]
291    fn test_batch_result_aggregation() {
292        let dir = TempDir::new().unwrap();
293
294        let files = vec![
295            create_test_file(&dir, "file1.yaml", "key: value\n"),
296            create_test_file(&dir, "file2.yaml", "list:\n  - item\n"),
297            create_test_file(&dir, "file3.yaml", "valid: yaml\n"),
298        ];
299
300        let config = ProcessingConfig::new();
301        let processor = BatchProcessor::new(config);
302
303        let result = processor.process(&files);
304
305        assert_eq!(result.total, 3);
306        assert_eq!(result.success_count(), 3);
307        assert!(result.is_success());
308    }
309
310    #[test]
311    fn test_effective_workers_default() {
312        let config = ProcessingConfig::new();
313        let processor = BatchProcessor::new(config);
314
315        assert!(!processor.should_use_custom_pool());
316    }
317
318    #[test]
319    fn test_effective_workers_custom() {
320        let config = ProcessingConfig::new().with_workers(4);
321        let processor = BatchProcessor::new(config);
322
323        assert!(processor.should_use_custom_pool());
324        assert_eq!(processor.config.effective_workers(), 4);
325    }
326
327    #[test]
328    fn test_process_empty_batch() {
329        let config = ProcessingConfig::new();
330        let processor = BatchProcessor::new(config);
331
332        let result = processor.process(&[]);
333
334        assert_eq!(result.total, 0);
335        assert!(result.is_success());
336    }
337
338    #[test]
339    fn test_process_batch_with_error() {
340        let dir = TempDir::new().unwrap();
341
342        let mut files = vec![
343            create_test_file(&dir, "file1.yaml", "key: value\n"),
344            create_test_file(&dir, "file2.yaml", "invalid: [\n"),
345        ];
346
347        let nonexistent = dir.path().join("nonexistent.yaml");
348        files.push(DiscoveredFile {
349            path: nonexistent,
350            origin: super::super::discovery::DiscoveryOrigin::DirectPath,
351        });
352
353        let config = ProcessingConfig::new();
354        let processor = BatchProcessor::new(config);
355
356        let result = processor.process(&files);
357
358        assert_eq!(result.total, 3);
359        assert!(result.failed >= 1);
360        assert!(!result.is_success());
361        assert!(!result.errors.is_empty());
362    }
363
364    #[test]
365    fn test_format_file_parse_error() {
366        let dir = TempDir::new().unwrap();
367        let file = create_test_file(&dir, "invalid.yaml", "invalid: [\n");
368
369        let config = ProcessingConfig::new();
370        let processor = BatchProcessor::new(config);
371
372        let result = processor.format_file(&file.path);
373        assert!(result.is_err());
374    }
375
376    #[test]
377    fn test_atomic_write() {
378        let dir = TempDir::new().unwrap();
379        let path = dir.path().join("test.yaml");
380        fs::write(&path, "old content").unwrap();
381
382        BatchProcessor::write_file_atomic(&path, "new content").unwrap();
383
384        let content = fs::read_to_string(&path).unwrap();
385        assert_eq!(content, "new content");
386
387        let temp_path = path.with_extension("tmp");
388        assert!(!temp_path.exists());
389    }
390
391    #[test]
392    fn test_process_batch_convenience_function() {
393        let dir = TempDir::new().unwrap();
394
395        let files = vec![create_test_file(&dir, "file1.yaml", "key: value\n")];
396
397        let config = ProcessingConfig::new();
398        let result = process_batch(&files, config);
399
400        assert_eq!(result.total, 1);
401        assert!(result.is_success());
402    }
403
404    #[test]
405    fn test_large_file_processing() {
406        let dir = TempDir::new().unwrap();
407
408        let large_content = "key: value\n".repeat(100_000);
409        let file = create_test_file(&dir, "large.yaml", &large_content);
410
411        let config = ProcessingConfig::new().with_mmap_threshold(1024);
412        let processor = BatchProcessor::new(config);
413
414        let result = processor.process_single_file(&file.path);
415        assert!(result.is_success());
416    }
417}