1use std::path::Path;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::time::Instant;
6
7use fast_yaml_core::emitter::{Emitter, EmitterConfig};
8use rayon::prelude::*;
9
10use super::config::ProcessingConfig;
11use super::discovery::DiscoveredFile;
12use super::error::ProcessingError;
13use super::reader::SmartFileReader;
14use super::result::{BatchResult, FileOutcome, FileResult};
15
16pub struct BatchProcessor {
21 config: ProcessingConfig,
22 emitter_config: EmitterConfig,
23 reader: SmartFileReader,
24}
25
26impl BatchProcessor {
27 pub fn new(config: ProcessingConfig) -> Self {
29 let emitter_config = EmitterConfig::new()
30 .with_indent(config.indent as usize)
31 .with_width(config.width);
32
33 let reader = SmartFileReader::with_threshold(config.mmap_threshold as u64);
34
35 Self {
36 config,
37 emitter_config,
38 reader,
39 }
40 }
41
42 pub fn process(&self, files: &[DiscoveredFile]) -> BatchResult {
47 let batch_start = Instant::now();
48 let total = files.len();
49
50 if total == 0 {
51 return BatchResult::new();
52 }
53
54 let results = if self.should_use_custom_pool() {
55 self.process_with_custom_pool(files)
56 } else {
57 self.process_with_default_pool(files)
58 };
59
60 let mut batch = BatchResult::from_results(results);
61 batch.duration = batch_start.elapsed();
62 batch
63 }
64
65 fn process_with_custom_pool(&self, files: &[DiscoveredFile]) -> Vec<FileResult> {
67 rayon::ThreadPoolBuilder::new()
68 .num_threads(self.config.effective_workers())
69 .build()
70 .map_or_else(
71 |_| {
72 self.process_files_parallel(files)
74 },
75 |pool| pool.install(|| self.process_files_parallel(files)),
76 )
77 }
78
79 fn process_with_default_pool(&self, files: &[DiscoveredFile]) -> Vec<FileResult> {
84 if files.len() < 10 {
86 self.process_files_sequential(files)
87 } else {
88 self.process_files_parallel(files)
89 }
90 }
91
92 fn process_files_parallel(&self, files: &[DiscoveredFile]) -> Vec<FileResult> {
94 let processed = AtomicUsize::new(0);
95 let total = files.len();
96
97 files
98 .par_iter()
99 .map(|file| {
100 let result = self.process_single_file(&file.path);
101
102 if self.config.verbose {
103 let n = processed.fetch_add(1, Ordering::Relaxed) + 1;
104 let msg = format!("[{n}/{total}] {}", file.path.display());
106 eprintln!("{msg}");
107 }
108
109 result
110 })
111 .collect()
112 }
113
114 fn process_files_sequential(&self, files: &[DiscoveredFile]) -> Vec<FileResult> {
119 let total = files.len();
120
121 files
122 .iter()
123 .enumerate()
124 .map(|(i, file)| {
125 let result = self.process_single_file(&file.path);
126
127 if self.config.verbose {
128 eprintln!("[{}/{}] {}", i + 1, total, file.path.display());
129 }
130
131 result
132 })
133 .collect()
134 }
135
136 fn process_single_file(&self, path: &Path) -> FileResult {
138 let start = Instant::now();
139
140 match self.format_file(path) {
141 Ok(changed) => {
142 let duration = start.elapsed();
143 let outcome = if self.config.dry_run && changed {
144 FileOutcome::Skipped { duration }
145 } else if changed {
146 FileOutcome::Formatted {
147 changed: true,
148 duration,
149 }
150 } else {
151 FileOutcome::Unchanged { duration }
152 };
153 FileResult::new(path.to_path_buf(), outcome)
154 }
155 Err(error) => FileResult::new(
156 path.to_path_buf(),
157 FileOutcome::Failed {
158 error,
159 duration: start.elapsed(),
160 },
161 ),
162 }
163 }
164
165 fn format_file(&self, path: &Path) -> Result<bool, ProcessingError> {
170 let file_content = self.reader.read(path)?;
171 let original = file_content.as_str()?;
172
173 let formatted = Emitter::format_with_config(original, &self.emitter_config)
174 .map_err(|e| ProcessingError::FormatError(format!("{}: {}", path.display(), e)))?;
175
176 let changed = original != formatted;
177
178 if changed && self.config.in_place && !self.config.dry_run {
179 Self::write_file_atomic(path, &formatted)?;
180 }
181
182 Ok(changed)
183 }
184
185 fn write_file_atomic(path: &Path, content: &str) -> Result<(), ProcessingError> {
190 let temp_path = path.with_extension("tmp");
191
192 std::fs::write(&temp_path, content).map_err(ProcessingError::WriteError)?;
193
194 std::fs::rename(&temp_path, path).map_err(ProcessingError::WriteError)?;
195
196 Ok(())
197 }
198
199 const fn should_use_custom_pool(&self) -> bool {
201 self.config.workers != 0
202 }
203}
204
205pub fn process_batch(files: &[DiscoveredFile], config: ProcessingConfig) -> BatchResult {
207 let processor = BatchProcessor::new(config);
208 processor.process(files)
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use std::fs;
215 use tempfile::TempDir;
216
217 fn create_test_file(dir: &TempDir, name: &str, content: &str) -> DiscoveredFile {
218 let path = dir.path().join(name);
219 fs::write(&path, content).unwrap();
220 DiscoveredFile {
221 path,
222 origin: super::super::discovery::DiscoveryOrigin::DirectPath,
223 }
224 }
225
226 #[test]
227 fn test_process_single_file_success() {
228 let dir = TempDir::new().unwrap();
229 let file = create_test_file(&dir, "test.yaml", "key: value\n");
230
231 let config = ProcessingConfig::new();
232 let processor = BatchProcessor::new(config);
233
234 let result = processor.process_single_file(&file.path);
235 assert!(result.is_success());
236 }
237
238 #[test]
239 fn test_process_single_file_no_write_when_not_in_place() {
240 let dir = TempDir::new().unwrap();
241 let file = create_test_file(&dir, "test.yaml", "key: value\n");
242
243 let config = ProcessingConfig::new().with_in_place(false);
244 let processor = BatchProcessor::new(config);
245
246 let original_content = fs::read_to_string(&file.path).unwrap();
247 let result = processor.process_single_file(&file.path);
248
249 let after_content = fs::read_to_string(&file.path).unwrap();
251 assert_eq!(original_content, after_content);
252 assert!(result.is_success());
253 }
254
255 #[test]
256 fn test_process_single_file_dry_run() {
257 let dir = TempDir::new().unwrap();
258 let file = create_test_file(&dir, "test.yaml", "key:value\n");
259
260 let config = ProcessingConfig::new()
261 .with_dry_run(true)
262 .with_in_place(true);
263 let processor = BatchProcessor::new(config);
264
265 let original_content = fs::read_to_string(&file.path).unwrap();
266 let _ = processor.process_single_file(&file.path);
267
268 let after_content = fs::read_to_string(&file.path).unwrap();
269 assert_eq!(original_content, after_content);
270 }
271
272 #[test]
273 fn test_process_single_file_in_place() {
274 let dir = TempDir::new().unwrap();
275 let file = create_test_file(&dir, "test.yaml", "key: value\n");
276
277 let config = ProcessingConfig::new().with_in_place(true);
278 let processor = BatchProcessor::new(config);
279
280 let original_content = fs::read_to_string(&file.path).unwrap();
281 let result = processor.process_single_file(&file.path);
282
283 let after_content = fs::read_to_string(&file.path).unwrap();
284
285 if result.outcome.was_changed() {
286 assert_ne!(original_content, after_content);
287 }
288 }
289
290 #[test]
291 fn test_batch_result_aggregation() {
292 let dir = TempDir::new().unwrap();
293
294 let files = vec![
295 create_test_file(&dir, "file1.yaml", "key: value\n"),
296 create_test_file(&dir, "file2.yaml", "list:\n - item\n"),
297 create_test_file(&dir, "file3.yaml", "valid: yaml\n"),
298 ];
299
300 let config = ProcessingConfig::new();
301 let processor = BatchProcessor::new(config);
302
303 let result = processor.process(&files);
304
305 assert_eq!(result.total, 3);
306 assert_eq!(result.success_count(), 3);
307 assert!(result.is_success());
308 }
309
310 #[test]
311 fn test_effective_workers_default() {
312 let config = ProcessingConfig::new();
313 let processor = BatchProcessor::new(config);
314
315 assert!(!processor.should_use_custom_pool());
316 }
317
318 #[test]
319 fn test_effective_workers_custom() {
320 let config = ProcessingConfig::new().with_workers(4);
321 let processor = BatchProcessor::new(config);
322
323 assert!(processor.should_use_custom_pool());
324 assert_eq!(processor.config.effective_workers(), 4);
325 }
326
327 #[test]
328 fn test_process_empty_batch() {
329 let config = ProcessingConfig::new();
330 let processor = BatchProcessor::new(config);
331
332 let result = processor.process(&[]);
333
334 assert_eq!(result.total, 0);
335 assert!(result.is_success());
336 }
337
338 #[test]
339 fn test_process_batch_with_error() {
340 let dir = TempDir::new().unwrap();
341
342 let mut files = vec![
343 create_test_file(&dir, "file1.yaml", "key: value\n"),
344 create_test_file(&dir, "file2.yaml", "invalid: [\n"),
345 ];
346
347 let nonexistent = dir.path().join("nonexistent.yaml");
348 files.push(DiscoveredFile {
349 path: nonexistent,
350 origin: super::super::discovery::DiscoveryOrigin::DirectPath,
351 });
352
353 let config = ProcessingConfig::new();
354 let processor = BatchProcessor::new(config);
355
356 let result = processor.process(&files);
357
358 assert_eq!(result.total, 3);
359 assert!(result.failed >= 1);
360 assert!(!result.is_success());
361 assert!(!result.errors.is_empty());
362 }
363
364 #[test]
365 fn test_format_file_parse_error() {
366 let dir = TempDir::new().unwrap();
367 let file = create_test_file(&dir, "invalid.yaml", "invalid: [\n");
368
369 let config = ProcessingConfig::new();
370 let processor = BatchProcessor::new(config);
371
372 let result = processor.format_file(&file.path);
373 assert!(result.is_err());
374 }
375
376 #[test]
377 fn test_atomic_write() {
378 let dir = TempDir::new().unwrap();
379 let path = dir.path().join("test.yaml");
380 fs::write(&path, "old content").unwrap();
381
382 BatchProcessor::write_file_atomic(&path, "new content").unwrap();
383
384 let content = fs::read_to_string(&path).unwrap();
385 assert_eq!(content, "new content");
386
387 let temp_path = path.with_extension("tmp");
388 assert!(!temp_path.exists());
389 }
390
391 #[test]
392 fn test_process_batch_convenience_function() {
393 let dir = TempDir::new().unwrap();
394
395 let files = vec![create_test_file(&dir, "file1.yaml", "key: value\n")];
396
397 let config = ProcessingConfig::new();
398 let result = process_batch(&files, config);
399
400 assert_eq!(result.total, 1);
401 assert!(result.is_success());
402 }
403
404 #[test]
405 fn test_large_file_processing() {
406 let dir = TempDir::new().unwrap();
407
408 let large_content = "key: value\n".repeat(100_000);
409 let file = create_test_file(&dir, "large.yaml", &large_content);
410
411 let config = ProcessingConfig::new().with_mmap_threshold(1024);
412 let processor = BatchProcessor::new(config);
413
414 let result = processor.process_single_file(&file.path);
415 assert!(result.is_success());
416 }
417}