1use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::time::Instant;
6
7use fast_yaml_core::emitter::{Emitter, EmitterConfig};
8use rayon::prelude::*;
9
10use crate::config::Config;
11use crate::error::{Error, Result};
12use crate::io::SmartReader;
13use crate::result::{BatchResult, FileOutcome, FileResult};
14
15#[derive(Debug)]
48pub struct FileProcessor {
49 config: Config,
50 reader: SmartReader,
51}
52
53impl FileProcessor {
54 pub fn new() -> Self {
56 Self::with_config(Config::default())
57 }
58
59 pub const fn with_config(config: Config) -> Self {
61 let reader = SmartReader::with_threshold(config.mmap_threshold() as u64);
62
63 Self { config, reader }
64 }
65
66 pub fn process<F, R>(&self, paths: &[PathBuf], f: F) -> BatchResult
70 where
71 F: Fn(&Path, &str) -> Result<R> + Sync,
72 R: Send,
73 {
74 let batch_start = Instant::now();
75 let total = paths.len();
76
77 if total == 0 {
78 return BatchResult::new();
79 }
80
81 let results = if Self::should_use_sequential(paths) {
82 self.process_files_sequential(paths, &f)
83 } else {
84 self.process_files_parallel(paths, &f)
85 };
86
87 let mut batch = BatchResult::from_results(results);
88 batch.duration = batch_start.elapsed();
89 batch
90 }
91
92 pub fn parse_files(&self, paths: &[PathBuf]) -> BatchResult {
94 self.process(paths, |path, content| {
95 fast_yaml_core::Parser::parse_str(content)
96 .map_err(|source| Error::Parse { index: 0, source })?
97 .ok_or_else(|| Error::Format {
98 message: format!("empty document in {}", path.display()),
99 })?;
100 Ok(())
101 })
102 }
103
104 pub fn format_files(
106 &self,
107 paths: &[PathBuf],
108 emitter_config: &EmitterConfig,
109 ) -> Vec<(PathBuf, Result<String>)> {
110 let process_file = |path: &Path| -> Result<String> {
111 let file_content = self.reader.read(path)?;
112 let original = file_content.as_str()?;
113
114 Emitter::format_with_config(original, emitter_config).map_err(|e| Error::Format {
115 message: format!("{}: {}", path.display(), e),
116 })
117 };
118
119 if Self::should_use_sequential(paths) {
120 paths
121 .iter()
122 .map(|path| (path.clone(), process_file(path)))
123 .collect()
124 } else {
125 paths
126 .par_iter()
127 .map(|path| (path.clone(), process_file(path)))
128 .collect()
129 }
130 }
131
132 pub fn format_in_place(
134 &self,
135 paths: &[PathBuf],
136 emitter_config: &EmitterConfig,
137 ) -> BatchResult {
138 let batch_start = std::time::Instant::now();
139 let total = paths.len();
140
141 if total == 0 {
142 return BatchResult::new();
143 }
144
145 let results = if Self::should_use_sequential(paths) {
146 paths
147 .iter()
148 .map(|path| self.format_single_file(path, emitter_config))
149 .collect()
150 } else {
151 paths
152 .par_iter()
153 .map(|path| self.format_single_file(path, emitter_config))
154 .collect()
155 };
156
157 let mut batch = BatchResult::from_results(results);
158 batch.duration = batch_start.elapsed();
159 batch
160 }
161
162 fn format_single_file(&self, path: &Path, emitter_config: &EmitterConfig) -> FileResult {
164 let start = std::time::Instant::now();
165
166 let metadata = match std::fs::metadata(path) {
167 Ok(m) => m,
168 Err(source) => {
169 return FileResult::new(
170 path.to_path_buf(),
171 FileOutcome::Error {
172 error: Error::Io {
173 path: path.to_path_buf(),
174 source,
175 },
176 duration: start.elapsed(),
177 },
178 );
179 }
180 };
181
182 let file_size = metadata.len();
183 let max_size = self.config.max_input_size();
184
185 #[allow(clippy::cast_possible_truncation)]
186 let size = file_size as usize;
187
188 if file_size > max_size as u64 {
189 return FileResult::new(
190 path.to_path_buf(),
191 FileOutcome::Error {
192 error: Error::InputTooLarge {
193 size,
194 max: max_size,
195 },
196 duration: start.elapsed(),
197 },
198 );
199 }
200
201 let file_content = match self.reader.read(path) {
202 Ok(c) => c,
203 Err(error) => {
204 return FileResult::new(
205 path.to_path_buf(),
206 FileOutcome::Error {
207 error,
208 duration: start.elapsed(),
209 },
210 );
211 }
212 };
213
214 let content = match file_content.as_str() {
215 Ok(s) => s,
216 Err(error) => {
217 return FileResult::new(
218 path.to_path_buf(),
219 FileOutcome::Error {
220 error,
221 duration: start.elapsed(),
222 },
223 );
224 }
225 };
226
227 let formatted = match Emitter::format_with_config(content, emitter_config) {
228 Ok(f) => f,
229 Err(e) => {
230 return FileResult::new(
231 path.to_path_buf(),
232 FileOutcome::Error {
233 error: Error::Format {
234 message: format!("{}: {}", path.display(), e),
235 },
236 duration: start.elapsed(),
237 },
238 );
239 }
240 };
241
242 let changed = content != formatted;
243
244 if changed && let Err(error) = Self::write_file_atomic(path, &formatted) {
245 return FileResult::new(
246 path.to_path_buf(),
247 FileOutcome::Error {
248 error,
249 duration: start.elapsed(),
250 },
251 );
252 }
253
254 let duration = start.elapsed();
255 let outcome = if changed {
256 FileOutcome::Changed { duration }
257 } else {
258 FileOutcome::Success { duration }
259 };
260
261 FileResult::new(path.to_path_buf(), outcome)
262 }
263
264 fn process_files_parallel<F, R>(&self, paths: &[PathBuf], f: &F) -> Vec<FileResult>
266 where
267 F: Fn(&Path, &str) -> Result<R> + Sync,
268 R: Send,
269 {
270 paths
271 .par_iter()
272 .map(|path| self.process_single_file(path, f))
273 .collect()
274 }
275
276 fn process_files_sequential<F, R>(&self, paths: &[PathBuf], f: &F) -> Vec<FileResult>
278 where
279 F: Fn(&Path, &str) -> Result<R>,
280 {
281 paths
282 .iter()
283 .map(|path| self.process_single_file(path, f))
284 .collect()
285 }
286
287 fn process_single_file<F, R>(&self, path: &Path, f: &F) -> FileResult
289 where
290 F: Fn(&Path, &str) -> Result<R>,
291 {
292 let start = Instant::now();
293
294 match self.process_file_content(path, f) {
295 Ok(()) => {
296 let duration = start.elapsed();
297 FileResult::new(path.to_path_buf(), FileOutcome::Success { duration })
298 }
299 Err(error) => FileResult::new(
300 path.to_path_buf(),
301 FileOutcome::Error {
302 error,
303 duration: start.elapsed(),
304 },
305 ),
306 }
307 }
308
309 fn process_file_content<F, R>(&self, path: &Path, f: &F) -> Result<()>
311 where
312 F: Fn(&Path, &str) -> Result<R>,
313 {
314 let metadata = std::fs::metadata(path).map_err(|source| Error::Io {
315 path: path.to_path_buf(),
316 source,
317 })?;
318
319 let file_size = metadata.len();
320 let max_size = self.config.max_input_size();
321
322 if file_size > max_size as u64 {
323 #[allow(clippy::cast_possible_truncation)]
324 let size = file_size as usize;
325 return Err(Error::InputTooLarge {
326 size,
327 max: max_size,
328 });
329 }
330
331 let file_content = self.reader.read(path)?;
332 let content = file_content.as_str()?;
333
334 f(path, content)?;
335 Ok(())
336 }
337
338 fn write_file_atomic(path: &Path, content: &str) -> Result<()> {
345 let dir = path.parent().ok_or_else(|| Error::Write {
346 path: path.to_path_buf(),
347 source: std::io::Error::new(std::io::ErrorKind::NotFound, "no parent directory"),
348 })?;
349
350 let mut temp = tempfile::NamedTempFile::new_in(dir).map_err(|source| Error::Write {
351 path: path.to_path_buf(),
352 source,
353 })?;
354
355 temp.write_all(content.as_bytes())
356 .map_err(|source| Error::Write {
357 path: path.to_path_buf(),
358 source,
359 })?;
360
361 temp.persist(path).map_err(|e| Error::Write {
362 path: path.to_path_buf(),
363 source: e.error,
364 })?;
365
366 Ok(())
367 }
368
369 fn should_use_sequential(paths: &[PathBuf]) -> bool {
378 let file_count = paths.len();
379
380 if file_count < 4 {
381 return true;
382 }
383
384 let total_size: u64 = paths
385 .iter()
386 .filter_map(|p| std::fs::metadata(p).ok())
387 .map(|m| m.len())
388 .sum();
389
390 total_size < 1_000_000 && file_count < 10
391 }
392}
393
394impl Default for FileProcessor {
395 fn default() -> Self {
396 Self::new()
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use std::fs;
404 use tempfile::TempDir;
405
406 fn create_test_file(dir: &TempDir, name: &str, content: &str) -> PathBuf {
407 let path = dir.path().join(name);
408 fs::write(&path, content).unwrap();
409 path
410 }
411
412 #[test]
413 fn test_file_processor_new() {
414 let _processor = FileProcessor::new();
415 }
416
417 #[test]
418 fn test_file_processor_with_config() {
419 let config = Config::new().with_workers(Some(4));
420 let _processor = FileProcessor::with_config(config);
421 }
422
423 #[test]
424 fn test_process_single_file() {
425 let dir = TempDir::new().unwrap();
426 let path = create_test_file(&dir, "test.yaml", "key: value\n");
427
428 let processor = FileProcessor::new();
429 let result = processor.parse_files(&[path]);
430
431 assert_eq!(result.total, 1);
432 assert!(result.is_success());
433 }
434
435 #[test]
436 fn test_process_multiple_files() {
437 let dir = TempDir::new().unwrap();
438 let paths = vec![
439 create_test_file(&dir, "file1.yaml", "key1: value1\n"),
440 create_test_file(&dir, "file2.yaml", "key2: value2\n"),
441 create_test_file(&dir, "file3.yaml", "key3: value3\n"),
442 ];
443
444 let processor = FileProcessor::new();
445 let result = processor.parse_files(&paths);
446
447 assert_eq!(result.total, 3);
448 assert!(result.is_success());
449 }
450
451 #[test]
452 fn test_process_empty_batch() {
453 let processor = FileProcessor::new();
454 let result = processor.parse_files(&[]);
455
456 assert_eq!(result.total, 0);
457 assert!(result.is_success());
458 }
459
460 #[test]
461 fn test_process_with_errors() {
462 let dir = TempDir::new().unwrap();
463 let paths = vec![
464 create_test_file(&dir, "valid.yaml", "key: value\n"),
465 create_test_file(&dir, "invalid.yaml", "invalid: [\n"),
466 ];
467
468 let processor = FileProcessor::new();
469 let result = processor.parse_files(&paths);
470
471 assert_eq!(result.total, 2);
472 assert!(!result.is_success());
473 assert!(result.failed >= 1);
474 }
475
476 #[test]
477 fn test_parse_files() {
478 let dir = TempDir::new().unwrap();
479 let paths = vec![create_test_file(&dir, "test.yaml", "key: value\n")];
480
481 let processor = FileProcessor::new();
482 let result = processor.parse_files(&paths);
483
484 assert!(result.is_success());
485 }
486
487 #[test]
488 fn test_format_files() {
489 let dir = TempDir::new().unwrap();
490 let paths = vec![create_test_file(&dir, "test.yaml", "key: value\n")];
491
492 let processor = FileProcessor::new();
493 let emitter_config = EmitterConfig::new();
494 let results = processor.format_files(&paths, &emitter_config);
495
496 assert_eq!(results.len(), 1);
497 assert!(results[0].1.is_ok());
498 }
499
500 #[test]
501 fn test_format_in_place() {
502 let dir = TempDir::new().unwrap();
503 let path = create_test_file(&dir, "test.yaml", "key: value\n");
504
505 let processor = FileProcessor::new();
506 let emitter_config = EmitterConfig::new();
507 let result = processor.format_in_place(std::slice::from_ref(&path), &emitter_config);
508
509 assert_eq!(result.total, 1);
510 }
511
512 #[test]
513 fn test_atomic_write() {
514 let dir = TempDir::new().unwrap();
515 let path = dir.path().join("test.yaml");
516 fs::write(&path, "old content").unwrap();
517
518 FileProcessor::write_file_atomic(&path, "new content").unwrap();
519
520 let content = fs::read_to_string(&path).unwrap();
521 assert_eq!(content, "new content");
522 }
523
524 #[test]
525 fn test_large_file_with_mmap() {
526 let dir = TempDir::new().unwrap();
527
528 let large_content = "key: value\n".repeat(100_000);
529 let path = create_test_file(&dir, "large.yaml", &large_content);
530
531 let config = Config::new().with_mmap_threshold(1024);
532 let processor = FileProcessor::with_config(config);
533
534 let result = processor.parse_files(&[path]);
535 assert!(result.is_success());
536 }
537
538 #[test]
539 fn test_sequential_threshold_exactly_9_files() {
540 let dir = TempDir::new().unwrap();
541 let mut paths = Vec::new();
542
543 for i in 0..9 {
545 paths.push(create_test_file(
546 &dir,
547 &format!("file{i}.yaml"),
548 "key: value\n",
549 ));
550 }
551
552 let processor = FileProcessor::new();
553 let result = processor.parse_files(&paths);
554
555 assert_eq!(result.total, 9);
556 assert!(result.is_success());
557 }
558
559 #[test]
560 fn test_sequential_threshold_exactly_10_files() {
561 let dir = TempDir::new().unwrap();
562 let mut paths = Vec::new();
563
564 for i in 0..10 {
566 paths.push(create_test_file(
567 &dir,
568 &format!("file{i}.yaml"),
569 "key: value\n",
570 ));
571 }
572
573 let processor = FileProcessor::new();
574 let result = processor.parse_files(&paths);
575
576 assert_eq!(result.total, 10);
577 assert!(result.is_success());
578 }
579
580 #[test]
581 fn test_many_files_parallel() {
582 let dir = TempDir::new().unwrap();
583 let mut paths = Vec::new();
584
585 for i in 0..50 {
587 paths.push(create_test_file(
588 &dir,
589 &format!("file{i}.yaml"),
590 &format!("index: {i}\n"),
591 ));
592 }
593
594 let processor = FileProcessor::new();
595 let result = processor.parse_files(&paths);
596
597 assert_eq!(result.total, 50);
598 assert!(result.is_success());
599 }
600
601 #[test]
602 fn test_format_in_place_tracking() {
603 let dir = TempDir::new().unwrap();
604
605 let path = create_test_file(&dir, "test.yaml", "key: value\n");
607
608 let processor = FileProcessor::new();
609 let emitter_config = EmitterConfig::new();
610 let result = processor.format_in_place(std::slice::from_ref(&path), &emitter_config);
611
612 assert_eq!(result.total, 1);
614 assert!(result.changed <= result.total);
616 }
617
618 #[test]
619 fn test_format_changed_file() {
620 let dir = TempDir::new().unwrap();
621
622 let path = create_test_file(&dir, "unformatted.yaml", "key: value\n");
624
625 let processor = FileProcessor::new();
626 let emitter_config = EmitterConfig::new();
627 let result = processor.format_in_place(std::slice::from_ref(&path), &emitter_config);
628
629 assert_eq!(result.total, 1);
631 }
633
634 #[test]
635 fn test_all_files_fail() {
636 let dir = TempDir::new().unwrap();
637 let mut paths = Vec::new();
638
639 for i in 0..5 {
641 paths.push(create_test_file(
642 &dir,
643 &format!("invalid{i}.yaml"),
644 "invalid: [\n",
645 ));
646 }
647
648 let processor = FileProcessor::new();
649 let result = processor.parse_files(&paths);
650
651 assert_eq!(result.total, 5);
652 assert_eq!(result.failed, 5);
653 assert_eq!(result.success, 0);
654 assert!(!result.is_success());
655 }
656
657 #[test]
658 fn test_mixed_file_sizes() {
659 let dir = TempDir::new().unwrap();
660
661 let small = create_test_file(&dir, "small.yaml", "key: value\n");
663 let large = create_test_file(&dir, "large.yaml", &"key: value\n".repeat(100_000));
664
665 let config = Config::new().with_mmap_threshold(1024);
666 let processor = FileProcessor::with_config(config);
667
668 let result = processor.parse_files(&[small, large]);
669 assert_eq!(result.total, 2);
670 assert!(result.is_success());
671 }
672
673 #[test]
674 fn test_partial_batch_failure() {
675 let dir = TempDir::new().unwrap();
676
677 let paths = vec![
678 create_test_file(&dir, "valid1.yaml", "key1: value1\n"),
679 create_test_file(&dir, "invalid.yaml", "broken: [\n"),
680 create_test_file(&dir, "valid2.yaml", "key2: value2\n"),
681 ];
682
683 let processor = FileProcessor::new();
684 let result = processor.parse_files(&paths);
685
686 assert_eq!(result.total, 3);
687 assert_eq!(result.success, 2);
688 assert_eq!(result.failed, 1);
689 assert!(!result.is_success());
690 }
691
692 #[test]
693 fn test_atomic_write_temp_file_cleanup() {
694 let dir = TempDir::new().unwrap();
695 let path = dir.path().join("test.yaml");
696
697 FileProcessor::write_file_atomic(&path, "content1").unwrap();
699 assert_eq!(fs::read_to_string(&path).unwrap(), "content1");
700
701 FileProcessor::write_file_atomic(&path, "content2").unwrap();
703 assert_eq!(fs::read_to_string(&path).unwrap(), "content2");
704
705 assert_eq!(fs::read_to_string(&path).unwrap(), "content2");
707 }
708
709 #[test]
710 #[cfg(unix)]
711 fn test_write_to_readonly_directory() {
712 use std::os::unix::fs::PermissionsExt;
713
714 let dir = TempDir::new().unwrap();
715 let path = dir.path().join("test.yaml");
716
717 let mut perms = fs::metadata(dir.path()).unwrap().permissions();
719 perms.set_mode(0o444);
720 fs::set_permissions(dir.path(), perms).unwrap();
721
722 let result = FileProcessor::write_file_atomic(&path, "content");
724
725 let mut perms = fs::metadata(dir.path()).unwrap().permissions();
727 perms.set_mode(0o755);
728 let _ = fs::set_permissions(dir.path(), perms);
729
730 assert!(result.is_err());
731 }
732
733 #[test]
734 fn test_process_custom_operation() {
735 let dir = TempDir::new().unwrap();
736 let path = create_test_file(&dir, "test.yaml", "key: value\n");
737
738 let processor = FileProcessor::new();
739
740 let result = processor.process(&[path], |_path, content| Ok(content.len()));
742
743 assert!(result.is_success());
744 }
745
746 #[test]
747 fn test_format_files_returns_results() {
748 let dir = TempDir::new().unwrap();
749 let paths = vec![
750 create_test_file(&dir, "file1.yaml", "key1: value1\n"),
751 create_test_file(&dir, "file2.yaml", "key2: value2\n"),
752 ];
753
754 let processor = FileProcessor::new();
755 let emitter_config = EmitterConfig::new();
756 let results = processor.format_files(&paths, &emitter_config);
757
758 assert_eq!(results.len(), 2);
759 assert!(results[0].1.is_ok());
760 assert!(results[1].1.is_ok());
761 }
762
763 #[test]
764 fn test_default_equals_new() {
765 let processor1 = FileProcessor::new();
766 let processor2 = FileProcessor::default();
767
768 assert_eq!(
770 processor1.config.mmap_threshold(),
771 processor2.config.mmap_threshold()
772 );
773 }
774
775 #[test]
776 fn test_max_input_size_enforcement() {
777 let dir = TempDir::new().unwrap();
778
779 let large_content = "x".repeat(2000);
781 let path = create_test_file(&dir, "large.yaml", &large_content);
782
783 let config = Config::new().with_max_input_size(1000);
785 let processor = FileProcessor::with_config(config);
786
787 let result = processor.parse_files(&[path]);
788
789 assert_eq!(result.total, 1);
791 assert_eq!(result.failed, 1);
792 assert!(!result.is_success());
793 }
794
795 #[test]
796 fn test_file_within_size_limit() {
797 let dir = TempDir::new().unwrap();
798
799 let content = "key: value\n";
801 let path = create_test_file(&dir, "small.yaml", content);
802
803 let config = Config::new().with_max_input_size(1000);
805 let processor = FileProcessor::with_config(config);
806
807 let result = processor.parse_files(&[path]);
808
809 assert!(result.is_success());
811 }
812}