fast_yaml_parallel/files/
processor.rs

1//! Parallel file processor for batch YAML operations.
2
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::time::Instant;
6
7use fast_yaml_core::emitter::{Emitter, EmitterConfig};
8use rayon::prelude::*;
9
10use crate::config::Config;
11use crate::error::{Error, Result};
12use crate::io::SmartReader;
13use crate::result::{BatchResult, FileOutcome, FileResult};
14
15/// Parallel file processor for batch YAML operations.
16///
17/// Processes multiple YAML files in parallel using Rayon's work-stealing scheduler.
18/// Automatically chooses optimal reading strategy based on file size (in-memory vs mmap).
19///
20/// # Security: Path Trust Boundary
21///
22/// This is a library crate providing file processing primitives. Path validation
23/// is the responsibility of the caller:
24///
25/// - **For CLI tools**: Validate paths before passing to this API
26/// - **For libraries**: Document that paths must be trusted
27/// - **Path traversal**: No canonicalization or ".." filtering is performed
28/// - **Symlinks**: Followed without validation (use OS permissions for access control)
29///
30/// If your application accepts user-controlled paths, validate them before calling
31/// these methods. Example validation:
32///
33/// ```no_run
34/// use std::path::{Path, PathBuf};
35///
36/// fn validate_path(path: &Path, base_dir: &Path) -> Result<PathBuf, String> {
37///     let canonical = path.canonicalize()
38///         .map_err(|e| format!("invalid path: {}", e))?;
39///
40///     if !canonical.starts_with(base_dir) {
41///         return Err("path outside allowed directory".to_string());
42///     }
43///
44///     Ok(canonical)
45/// }
46/// ```
47#[derive(Debug)]
48pub struct FileProcessor {
49    config: Config,
50    reader: SmartReader,
51}
52
53impl FileProcessor {
54    /// Creates a processor with default config.
55    pub fn new() -> Self {
56        Self::with_config(Config::default())
57    }
58
59    /// Creates a processor with custom config.
60    pub const fn with_config(config: Config) -> Self {
61        let reader = SmartReader::with_threshold(config.mmap_threshold() as u64);
62
63        Self { config, reader }
64    }
65
66    /// Process files with custom operation.
67    ///
68    /// Generic function for applying custom processing to files in parallel.
69    pub fn process<F, R>(&self, paths: &[PathBuf], f: F) -> BatchResult
70    where
71        F: Fn(&Path, &str) -> Result<R> + Sync,
72        R: Send,
73    {
74        let batch_start = Instant::now();
75        let total = paths.len();
76
77        if total == 0 {
78            return BatchResult::new();
79        }
80
81        let results = if Self::should_use_sequential(paths) {
82            self.process_files_sequential(paths, &f)
83        } else {
84            self.process_files_parallel(paths, &f)
85        };
86
87        let mut batch = BatchResult::from_results(results);
88        batch.duration = batch_start.elapsed();
89        batch
90    }
91
92    /// Parse all files and return `BatchResult`.
93    pub fn parse_files(&self, paths: &[PathBuf]) -> BatchResult {
94        self.process(paths, |path, content| {
95            fast_yaml_core::Parser::parse_str(content)
96                .map_err(|source| Error::Parse { index: 0, source })?
97                .ok_or_else(|| Error::Format {
98                    message: format!("empty document in {}", path.display()),
99                })?;
100            Ok(())
101        })
102    }
103
104    /// Format files and return `(path, formatted_content)` pairs.
105    pub fn format_files(
106        &self,
107        paths: &[PathBuf],
108        emitter_config: &EmitterConfig,
109    ) -> Vec<(PathBuf, Result<String>)> {
110        let process_file = |path: &Path| -> Result<String> {
111            let file_content = self.reader.read(path)?;
112            let original = file_content.as_str()?;
113
114            Emitter::format_with_config(original, emitter_config).map_err(|e| Error::Format {
115                message: format!("{}: {}", path.display(), e),
116            })
117        };
118
119        if Self::should_use_sequential(paths) {
120            paths
121                .iter()
122                .map(|path| (path.clone(), process_file(path)))
123                .collect()
124        } else {
125            paths
126                .par_iter()
127                .map(|path| (path.clone(), process_file(path)))
128                .collect()
129        }
130    }
131
132    /// Format files in place (write back if changed).
133    pub fn format_in_place(
134        &self,
135        paths: &[PathBuf],
136        emitter_config: &EmitterConfig,
137    ) -> BatchResult {
138        let batch_start = std::time::Instant::now();
139        let total = paths.len();
140
141        if total == 0 {
142            return BatchResult::new();
143        }
144
145        let results = if Self::should_use_sequential(paths) {
146            paths
147                .iter()
148                .map(|path| self.format_single_file(path, emitter_config))
149                .collect()
150        } else {
151            paths
152                .par_iter()
153                .map(|path| self.format_single_file(path, emitter_config))
154                .collect()
155        };
156
157        let mut batch = BatchResult::from_results(results);
158        batch.duration = batch_start.elapsed();
159        batch
160    }
161
162    /// Format a single file in place
163    fn format_single_file(&self, path: &Path, emitter_config: &EmitterConfig) -> FileResult {
164        let start = std::time::Instant::now();
165
166        let metadata = match std::fs::metadata(path) {
167            Ok(m) => m,
168            Err(source) => {
169                return FileResult::new(
170                    path.to_path_buf(),
171                    FileOutcome::Error {
172                        error: Error::Io {
173                            path: path.to_path_buf(),
174                            source,
175                        },
176                        duration: start.elapsed(),
177                    },
178                );
179            }
180        };
181
182        let file_size = metadata.len();
183        let max_size = self.config.max_input_size();
184
185        #[allow(clippy::cast_possible_truncation)]
186        let size = file_size as usize;
187
188        if file_size > max_size as u64 {
189            return FileResult::new(
190                path.to_path_buf(),
191                FileOutcome::Error {
192                    error: Error::InputTooLarge {
193                        size,
194                        max: max_size,
195                    },
196                    duration: start.elapsed(),
197                },
198            );
199        }
200
201        let file_content = match self.reader.read(path) {
202            Ok(c) => c,
203            Err(error) => {
204                return FileResult::new(
205                    path.to_path_buf(),
206                    FileOutcome::Error {
207                        error,
208                        duration: start.elapsed(),
209                    },
210                );
211            }
212        };
213
214        let content = match file_content.as_str() {
215            Ok(s) => s,
216            Err(error) => {
217                return FileResult::new(
218                    path.to_path_buf(),
219                    FileOutcome::Error {
220                        error,
221                        duration: start.elapsed(),
222                    },
223                );
224            }
225        };
226
227        let formatted = match Emitter::format_with_config(content, emitter_config) {
228            Ok(f) => f,
229            Err(e) => {
230                return FileResult::new(
231                    path.to_path_buf(),
232                    FileOutcome::Error {
233                        error: Error::Format {
234                            message: format!("{}: {}", path.display(), e),
235                        },
236                        duration: start.elapsed(),
237                    },
238                );
239            }
240        };
241
242        let changed = content != formatted;
243
244        if changed && let Err(error) = Self::write_file_atomic(path, &formatted) {
245            return FileResult::new(
246                path.to_path_buf(),
247                FileOutcome::Error {
248                    error,
249                    duration: start.elapsed(),
250                },
251            );
252        }
253
254        let duration = start.elapsed();
255        let outcome = if changed {
256            FileOutcome::Changed { duration }
257        } else {
258            FileOutcome::Success { duration }
259        };
260
261        FileResult::new(path.to_path_buf(), outcome)
262    }
263
264    /// Processes files in parallel using Rayon's `par_iter`
265    fn process_files_parallel<F, R>(&self, paths: &[PathBuf], f: &F) -> Vec<FileResult>
266    where
267        F: Fn(&Path, &str) -> Result<R> + Sync,
268        R: Send,
269    {
270        paths
271            .par_iter()
272            .map(|path| self.process_single_file(path, f))
273            .collect()
274    }
275
276    /// Processes files sequentially without parallel overhead.
277    fn process_files_sequential<F, R>(&self, paths: &[PathBuf], f: &F) -> Vec<FileResult>
278    where
279        F: Fn(&Path, &str) -> Result<R>,
280    {
281        paths
282            .iter()
283            .map(|path| self.process_single_file(path, f))
284            .collect()
285    }
286
287    /// Processes a single file and returns the result
288    fn process_single_file<F, R>(&self, path: &Path, f: &F) -> FileResult
289    where
290        F: Fn(&Path, &str) -> Result<R>,
291    {
292        let start = Instant::now();
293
294        match self.process_file_content(path, f) {
295            Ok(()) => {
296                let duration = start.elapsed();
297                FileResult::new(path.to_path_buf(), FileOutcome::Success { duration })
298            }
299            Err(error) => FileResult::new(
300                path.to_path_buf(),
301                FileOutcome::Error {
302                    error,
303                    duration: start.elapsed(),
304                },
305            ),
306        }
307    }
308
309    /// Process file content with given function
310    fn process_file_content<F, R>(&self, path: &Path, f: &F) -> Result<()>
311    where
312        F: Fn(&Path, &str) -> Result<R>,
313    {
314        let metadata = std::fs::metadata(path).map_err(|source| Error::Io {
315            path: path.to_path_buf(),
316            source,
317        })?;
318
319        let file_size = metadata.len();
320        let max_size = self.config.max_input_size();
321
322        if file_size > max_size as u64 {
323            #[allow(clippy::cast_possible_truncation)]
324            let size = file_size as usize;
325            return Err(Error::InputTooLarge {
326                size,
327                max: max_size,
328            });
329        }
330
331        let file_content = self.reader.read(path)?;
332        let content = file_content.as_str()?;
333
334        f(path, content)?;
335        Ok(())
336    }
337
338    /// Writes content to file atomically using secure temp file + rename.
339    ///
340    /// Uses `tempfile::NamedTempFile` to prevent TOCTOU vulnerabilities:
341    /// - Creates temp file with `O_EXCL` flag (fails if exists)
342    /// - Uses unpredictable name to prevent symlink attacks
343    /// - Atomically renames to final path
344    fn write_file_atomic(path: &Path, content: &str) -> Result<()> {
345        let dir = path.parent().ok_or_else(|| Error::Write {
346            path: path.to_path_buf(),
347            source: std::io::Error::new(std::io::ErrorKind::NotFound, "no parent directory"),
348        })?;
349
350        let mut temp = tempfile::NamedTempFile::new_in(dir).map_err(|source| Error::Write {
351            path: path.to_path_buf(),
352            source,
353        })?;
354
355        temp.write_all(content.as_bytes())
356            .map_err(|source| Error::Write {
357                path: path.to_path_buf(),
358                source,
359            })?;
360
361        temp.persist(path).map_err(|e| Error::Write {
362            path: path.to_path_buf(),
363            source: e.error,
364        })?;
365
366        Ok(())
367    }
368
369    /// Returns true if sequential processing should be used.
370    ///
371    /// Sequential processing is preferred when:
372    /// - Very few files (< 4)
373    /// - Small total size (< 1MB) AND moderate file count (< 10)
374    ///
375    /// This avoids parallelism overhead for small workloads while enabling
376    /// parallel processing for large files even if there are only a few of them.
377    fn should_use_sequential(paths: &[PathBuf]) -> bool {
378        let file_count = paths.len();
379
380        if file_count < 4 {
381            return true;
382        }
383
384        let total_size: u64 = paths
385            .iter()
386            .filter_map(|p| std::fs::metadata(p).ok())
387            .map(|m| m.len())
388            .sum();
389
390        total_size < 1_000_000 && file_count < 10
391    }
392}
393
394impl Default for FileProcessor {
395    fn default() -> Self {
396        Self::new()
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use std::fs;
404    use tempfile::TempDir;
405
406    fn create_test_file(dir: &TempDir, name: &str, content: &str) -> PathBuf {
407        let path = dir.path().join(name);
408        fs::write(&path, content).unwrap();
409        path
410    }
411
412    #[test]
413    fn test_file_processor_new() {
414        let _processor = FileProcessor::new();
415    }
416
417    #[test]
418    fn test_file_processor_with_config() {
419        let config = Config::new().with_workers(Some(4));
420        let _processor = FileProcessor::with_config(config);
421    }
422
423    #[test]
424    fn test_process_single_file() {
425        let dir = TempDir::new().unwrap();
426        let path = create_test_file(&dir, "test.yaml", "key: value\n");
427
428        let processor = FileProcessor::new();
429        let result = processor.parse_files(&[path]);
430
431        assert_eq!(result.total, 1);
432        assert!(result.is_success());
433    }
434
435    #[test]
436    fn test_process_multiple_files() {
437        let dir = TempDir::new().unwrap();
438        let paths = vec![
439            create_test_file(&dir, "file1.yaml", "key1: value1\n"),
440            create_test_file(&dir, "file2.yaml", "key2: value2\n"),
441            create_test_file(&dir, "file3.yaml", "key3: value3\n"),
442        ];
443
444        let processor = FileProcessor::new();
445        let result = processor.parse_files(&paths);
446
447        assert_eq!(result.total, 3);
448        assert!(result.is_success());
449    }
450
451    #[test]
452    fn test_process_empty_batch() {
453        let processor = FileProcessor::new();
454        let result = processor.parse_files(&[]);
455
456        assert_eq!(result.total, 0);
457        assert!(result.is_success());
458    }
459
460    #[test]
461    fn test_process_with_errors() {
462        let dir = TempDir::new().unwrap();
463        let paths = vec![
464            create_test_file(&dir, "valid.yaml", "key: value\n"),
465            create_test_file(&dir, "invalid.yaml", "invalid: [\n"),
466        ];
467
468        let processor = FileProcessor::new();
469        let result = processor.parse_files(&paths);
470
471        assert_eq!(result.total, 2);
472        assert!(!result.is_success());
473        assert!(result.failed >= 1);
474    }
475
476    #[test]
477    fn test_parse_files() {
478        let dir = TempDir::new().unwrap();
479        let paths = vec![create_test_file(&dir, "test.yaml", "key: value\n")];
480
481        let processor = FileProcessor::new();
482        let result = processor.parse_files(&paths);
483
484        assert!(result.is_success());
485    }
486
487    #[test]
488    fn test_format_files() {
489        let dir = TempDir::new().unwrap();
490        let paths = vec![create_test_file(&dir, "test.yaml", "key: value\n")];
491
492        let processor = FileProcessor::new();
493        let emitter_config = EmitterConfig::new();
494        let results = processor.format_files(&paths, &emitter_config);
495
496        assert_eq!(results.len(), 1);
497        assert!(results[0].1.is_ok());
498    }
499
500    #[test]
501    fn test_format_in_place() {
502        let dir = TempDir::new().unwrap();
503        let path = create_test_file(&dir, "test.yaml", "key:  value\n");
504
505        let processor = FileProcessor::new();
506        let emitter_config = EmitterConfig::new();
507        let result = processor.format_in_place(std::slice::from_ref(&path), &emitter_config);
508
509        assert_eq!(result.total, 1);
510    }
511
512    #[test]
513    fn test_atomic_write() {
514        let dir = TempDir::new().unwrap();
515        let path = dir.path().join("test.yaml");
516        fs::write(&path, "old content").unwrap();
517
518        FileProcessor::write_file_atomic(&path, "new content").unwrap();
519
520        let content = fs::read_to_string(&path).unwrap();
521        assert_eq!(content, "new content");
522    }
523
524    #[test]
525    fn test_large_file_with_mmap() {
526        let dir = TempDir::new().unwrap();
527
528        let large_content = "key: value\n".repeat(100_000);
529        let path = create_test_file(&dir, "large.yaml", &large_content);
530
531        let config = Config::new().with_mmap_threshold(1024);
532        let processor = FileProcessor::with_config(config);
533
534        let result = processor.parse_files(&[path]);
535        assert!(result.is_success());
536    }
537
538    #[test]
539    fn test_sequential_threshold_exactly_9_files() {
540        let dir = TempDir::new().unwrap();
541        let mut paths = Vec::new();
542
543        // Create exactly 9 files (below threshold of 10)
544        for i in 0..9 {
545            paths.push(create_test_file(
546                &dir,
547                &format!("file{i}.yaml"),
548                "key: value\n",
549            ));
550        }
551
552        let processor = FileProcessor::new();
553        let result = processor.parse_files(&paths);
554
555        assert_eq!(result.total, 9);
556        assert!(result.is_success());
557    }
558
559    #[test]
560    fn test_sequential_threshold_exactly_10_files() {
561        let dir = TempDir::new().unwrap();
562        let mut paths = Vec::new();
563
564        // Create exactly 10 files (at threshold, triggers parallel)
565        for i in 0..10 {
566            paths.push(create_test_file(
567                &dir,
568                &format!("file{i}.yaml"),
569                "key: value\n",
570            ));
571        }
572
573        let processor = FileProcessor::new();
574        let result = processor.parse_files(&paths);
575
576        assert_eq!(result.total, 10);
577        assert!(result.is_success());
578    }
579
580    #[test]
581    fn test_many_files_parallel() {
582        let dir = TempDir::new().unwrap();
583        let mut paths = Vec::new();
584
585        // Create 50 files to definitely trigger parallel mode
586        for i in 0..50 {
587            paths.push(create_test_file(
588                &dir,
589                &format!("file{i}.yaml"),
590                &format!("index: {i}\n"),
591            ));
592        }
593
594        let processor = FileProcessor::new();
595        let result = processor.parse_files(&paths);
596
597        assert_eq!(result.total, 50);
598        assert!(result.is_success());
599    }
600
601    #[test]
602    fn test_format_in_place_tracking() {
603        let dir = TempDir::new().unwrap();
604
605        // Create file - whether it changes depends on emitter behavior
606        let path = create_test_file(&dir, "test.yaml", "key: value\n");
607
608        let processor = FileProcessor::new();
609        let emitter_config = EmitterConfig::new();
610        let result = processor.format_in_place(std::slice::from_ref(&path), &emitter_config);
611
612        // Should process successfully
613        assert_eq!(result.total, 1);
614        // Changed count depends on emitter behavior
615        assert!(result.changed <= result.total);
616    }
617
618    #[test]
619    fn test_format_changed_file() {
620        let dir = TempDir::new().unwrap();
621
622        // Create file with bad formatting (extra spaces)
623        let path = create_test_file(&dir, "unformatted.yaml", "key:     value\n");
624
625        let processor = FileProcessor::new();
626        let emitter_config = EmitterConfig::new();
627        let result = processor.format_in_place(std::slice::from_ref(&path), &emitter_config);
628
629        // File should be changed
630        assert_eq!(result.total, 1);
631        // Note: Result depends on whether emitter normalizes spacing
632    }
633
634    #[test]
635    fn test_all_files_fail() {
636        let dir = TempDir::new().unwrap();
637        let mut paths = Vec::new();
638
639        // Create multiple invalid files
640        for i in 0..5 {
641            paths.push(create_test_file(
642                &dir,
643                &format!("invalid{i}.yaml"),
644                "invalid: [\n",
645            ));
646        }
647
648        let processor = FileProcessor::new();
649        let result = processor.parse_files(&paths);
650
651        assert_eq!(result.total, 5);
652        assert_eq!(result.failed, 5);
653        assert_eq!(result.success, 0);
654        assert!(!result.is_success());
655    }
656
657    #[test]
658    fn test_mixed_file_sizes() {
659        let dir = TempDir::new().unwrap();
660
661        // Create mix of small and large files
662        let small = create_test_file(&dir, "small.yaml", "key: value\n");
663        let large = create_test_file(&dir, "large.yaml", &"key: value\n".repeat(100_000));
664
665        let config = Config::new().with_mmap_threshold(1024);
666        let processor = FileProcessor::with_config(config);
667
668        let result = processor.parse_files(&[small, large]);
669        assert_eq!(result.total, 2);
670        assert!(result.is_success());
671    }
672
673    #[test]
674    fn test_partial_batch_failure() {
675        let dir = TempDir::new().unwrap();
676
677        let paths = vec![
678            create_test_file(&dir, "valid1.yaml", "key1: value1\n"),
679            create_test_file(&dir, "invalid.yaml", "broken: [\n"),
680            create_test_file(&dir, "valid2.yaml", "key2: value2\n"),
681        ];
682
683        let processor = FileProcessor::new();
684        let result = processor.parse_files(&paths);
685
686        assert_eq!(result.total, 3);
687        assert_eq!(result.success, 2);
688        assert_eq!(result.failed, 1);
689        assert!(!result.is_success());
690    }
691
692    #[test]
693    fn test_atomic_write_temp_file_cleanup() {
694        let dir = TempDir::new().unwrap();
695        let path = dir.path().join("test.yaml");
696
697        // Initial write
698        FileProcessor::write_file_atomic(&path, "content1").unwrap();
699        assert_eq!(fs::read_to_string(&path).unwrap(), "content1");
700
701        // Update
702        FileProcessor::write_file_atomic(&path, "content2").unwrap();
703        assert_eq!(fs::read_to_string(&path).unwrap(), "content2");
704
705        // Verify atomic write succeeded
706        assert_eq!(fs::read_to_string(&path).unwrap(), "content2");
707    }
708
709    #[test]
710    #[cfg(unix)]
711    fn test_write_to_readonly_directory() {
712        use std::os::unix::fs::PermissionsExt;
713
714        let dir = TempDir::new().unwrap();
715        let path = dir.path().join("test.yaml");
716
717        // Make directory read-only
718        let mut perms = fs::metadata(dir.path()).unwrap().permissions();
719        perms.set_mode(0o444);
720        fs::set_permissions(dir.path(), perms).unwrap();
721
722        // Attempt to write should fail
723        let result = FileProcessor::write_file_atomic(&path, "content");
724
725        // Restore permissions for cleanup
726        let mut perms = fs::metadata(dir.path()).unwrap().permissions();
727        perms.set_mode(0o755);
728        let _ = fs::set_permissions(dir.path(), perms);
729
730        assert!(result.is_err());
731    }
732
733    #[test]
734    fn test_process_custom_operation() {
735        let dir = TempDir::new().unwrap();
736        let path = create_test_file(&dir, "test.yaml", "key: value\n");
737
738        let processor = FileProcessor::new();
739
740        // Custom operation that counts characters
741        let result = processor.process(&[path], |_path, content| Ok(content.len()));
742
743        assert!(result.is_success());
744    }
745
746    #[test]
747    fn test_format_files_returns_results() {
748        let dir = TempDir::new().unwrap();
749        let paths = vec![
750            create_test_file(&dir, "file1.yaml", "key1: value1\n"),
751            create_test_file(&dir, "file2.yaml", "key2: value2\n"),
752        ];
753
754        let processor = FileProcessor::new();
755        let emitter_config = EmitterConfig::new();
756        let results = processor.format_files(&paths, &emitter_config);
757
758        assert_eq!(results.len(), 2);
759        assert!(results[0].1.is_ok());
760        assert!(results[1].1.is_ok());
761    }
762
763    #[test]
764    fn test_default_equals_new() {
765        let processor1 = FileProcessor::new();
766        let processor2 = FileProcessor::default();
767
768        // Both should have same config
769        assert_eq!(
770            processor1.config.mmap_threshold(),
771            processor2.config.mmap_threshold()
772        );
773    }
774
775    #[test]
776    fn test_max_input_size_enforcement() {
777        let dir = TempDir::new().unwrap();
778
779        // Create file larger than custom limit (1KB)
780        let large_content = "x".repeat(2000);
781        let path = create_test_file(&dir, "large.yaml", &large_content);
782
783        // Configure processor with 1KB limit
784        let config = Config::new().with_max_input_size(1000);
785        let processor = FileProcessor::with_config(config);
786
787        let result = processor.parse_files(&[path]);
788
789        // Should fail with InputTooLarge error
790        assert_eq!(result.total, 1);
791        assert_eq!(result.failed, 1);
792        assert!(!result.is_success());
793    }
794
795    #[test]
796    fn test_file_within_size_limit() {
797        let dir = TempDir::new().unwrap();
798
799        // Create file smaller than limit
800        let content = "key: value\n";
801        let path = create_test_file(&dir, "small.yaml", content);
802
803        // Configure processor with 1KB limit
804        let config = Config::new().with_max_input_size(1000);
805        let processor = FileProcessor::with_config(config);
806
807        let result = processor.parse_files(&[path]);
808
809        // Should succeed
810        assert!(result.is_success());
811    }
812}