1use std::path::{Path, PathBuf};
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Instant;
4
5use anyhow::{Result, bail};
6use indicatif::MultiProgress;
7
8use crate::core::diff::preview::{SkipReason, TransformationReport};
9use crate::core::diff::renderer::DiffRenderer;
10use crate::core::pipeline::context::{
11 ExecutionEvent, IncompatibleRecipe, PipelineContext, PipelineStage, RecipeOrderer, StageTiming,
12 IncompatibilityReason, validate_recipe_order,
13};
14use crate::core::recipe::{DetectionReport, Recipe, TransformMode, TransformOptions};
15use crate::utils::terminal;
16
17pub struct PipelineExecutor {
18 stages: Vec<PipelineStage>,
19 recipe_names: Vec<String>,
20 context: PipelineContext,
21 events: Vec<ExecutionEvent>,
22 orderer: RecipeOrderer,
23 pub max_files: usize,
24 pub max_duration_seconds: u64,
25 pub tag_filter: Option<String>,
26 pub session_id: Option<String>,
27 pub session_options: Option<crate::core::session::SessionOptions>,
28}
29
30impl PipelineExecutor {
31 pub fn new(project_root: PathBuf) -> Self {
32 Self {
33 stages: Vec::new(),
34 recipe_names: Vec::new(),
35 context: PipelineContext::new(project_root),
36 events: Vec::new(),
37 orderer: RecipeOrderer::new(),
38 max_files: 10000,
39 max_duration_seconds: 300,
40 tag_filter: None,
41 session_id: None,
42 session_options: None,
43 }
44 }
45
46 pub fn with_limits(mut self, max_files: usize, max_duration_seconds: u64) -> Self {
47 self.max_files = max_files;
48 self.max_duration_seconds = max_duration_seconds;
49 self
50 }
51
52 pub fn with_tag(mut self, tag: Option<String>) -> Self {
53 self.tag_filter = tag;
54 self
55 }
56
57 pub fn with_session(mut self, session_id: String, options: crate::core::session::SessionOptions) -> Self {
58 self.session_id = Some(session_id);
59 self.session_options = Some(options);
60 self
61 }
62
63 #[allow(dead_code)]
64 pub fn with_ordering(mut self, orderer: RecipeOrderer) -> Self {
65 self.orderer = orderer;
66 self
67 }
68
69 pub fn add_recipe(mut self, recipe_name: &str) -> Self {
70 self.recipe_names.push(recipe_name.to_string());
71 self
72 }
73
74 pub fn execute(
75 mut self,
76 root: &Path,
77 mode: TransformMode,
78 review: bool,
79 autofix: bool,
80 registry: &crate::core::registry::RecipeRegistry,
81 verbose: bool,
82 ) -> Result<PipelineSummary> {
83 let recipe_refs: Vec<_> = self
84 .recipe_names
85 .iter()
86 .filter_map(|name| registry.find(name))
87 .collect();
88
89 let all_issues = validate_recipe_order(&recipe_refs);
90
91 let hard_failures: Vec<_> = all_issues
93 .iter()
94 .filter(|i| i.reason != IncompatibilityReason::OrderingHintViolation)
95 .collect();
96 let ordering_warnings: Vec<_> = all_issues
97 .iter()
98 .filter(|i| i.reason == IncompatibilityReason::OrderingHintViolation)
99 .collect();
100
101 if !hard_failures.is_empty() {
102 for inc in &hard_failures {
103 eprintln!(
104 "{} {}",
105 terminal::warning_prefix(),
106 format_validation_error(inc)
107 );
108 }
109 bail!("Recipe compatibility validation failed");
110 }
111
112 for inc in &ordering_warnings {
114 eprintln!(
115 "{} Ordering hint: {} should run before {} (hint from recipe metadata).",
116 terminal::warning_prefix(),
117 inc.recipe_b,
118 inc.recipe_a,
119 );
120 }
121
122 let metadata_orderer = RecipeOrderer::from_metadata(&recipe_refs);
124 for dep in metadata_orderer.dependencies {
125 self.orderer.dependencies.push(dep);
126 }
127
128 let ordered = self.orderer.order(&recipe_refs);
129
130 let total_files = std::sync::Arc::new(AtomicUsize::new(0));
131 let mut all_changed_files: Vec<PathBuf> = Vec::new();
132 let mut all_skipped_files: Vec<PathBuf> = Vec::new();
133 let mut all_unsupported: Vec<(PathBuf, Vec<String>)> = Vec::new();
134 let mut failed_stages: Vec<String> = Vec::new();
135 let mut timings: Vec<(String, u64)> = Vec::new();
136 let mut stage_inputs: Option<Vec<PathBuf>> = None;
137 let mut completed_recipes: Vec<String> = Vec::new();
138
139 let _multi = MultiProgress::new();
140 let pipeline_start = Instant::now();
141
142 for recipe in &ordered {
143 if pipeline_start.elapsed().as_secs() > self.max_duration_seconds {
144 println!(
145 "{} {}",
146 terminal::warning_prefix(),
147 terminal::explain_skip("Global execution timeout exceeded")
148 );
149 self.events.push(ExecutionEvent::StageSkipped {
150 recipe: recipe.metadata().name.to_string(),
151 reason: "timeout".to_string(),
152 });
153 break;
154 }
155
156 if total_files.load(Ordering::Relaxed) >= self.max_files {
157 println!(
158 "{} {}",
159 terminal::warning_prefix(),
160 terminal::explain_skip("Global file limit reached")
161 );
162 self.events.push(ExecutionEvent::StageSkipped {
163 recipe: recipe.metadata().name.to_string(),
164 reason: "file limit".to_string(),
165 });
166 break;
167 }
168
169 let metadata = recipe.metadata();
170
171 println!();
172 println!("{}", terminal::label("═".repeat(50).as_str()));
173 println!("Stage: {}", terminal::label(metadata.name));
174 println!("{}", metadata.description);
175 println!("{}", terminal::label("─".repeat(50).as_str()));
176
177 self.events.push(ExecutionEvent::StageStarted {
178 recipe: metadata.name.to_string(),
179 });
180
181 let detect_start = Instant::now();
182 let input_label = stage_inputs
183 .as_ref()
184 .map(|inputs| format!("{} modified file(s)", inputs.len()))
185 .unwrap_or_else(|| root.display().to_string());
186 let spinner = terminal::spinner(&format!("Scanning {input_label} with {}", metadata.name));
187
188 let mut detect_result = match stage_inputs.as_ref() {
189 Some(inputs) => detect_inputs(*recipe, inputs, &spinner),
190 None => recipe.detect(root, &spinner),
191 };
192 spinner.finish_and_clear();
193
194 if let Ok(ref mut report) = detect_result {
195 for analysis in &mut report.analyses {
196 let is_risky = analysis.classification == crate::core::recipe::FileClassification::Risky;
197 analysis.tags = crate::core::recipe::compute_tags_for_file(&analysis.path, None, &analysis.detected_patterns, is_risky, false);
198 }
199 if let Some(ref tag) = self.tag_filter {
200 report.analyses.retain(|analysis| analysis.tags.iter().any(|t| t == tag));
201 }
202 }
203
204 if let Err(e) = &detect_result {
205 eprintln!(
206 "{} Detect failed for {}: {}",
207 terminal::warning_prefix(),
208 metadata.name,
209 e
210 );
211 }
212
213 let detect_ms = detect_start.elapsed().as_millis() as u64;
214 timings.push((metadata.name.to_string(), detect_ms));
215
216 let transform_result: Result<_> = match &detect_result {
217 Ok(report) => {
218 for analysis in &report.analyses {
219 self.context
220 .add_file(analysis.path.clone(), analysis.clone());
221 total_files.fetch_add(1, Ordering::SeqCst);
222 }
223
224 let (format, prettier, no_format) = if let Some(ref opts) = self.session_options {
225 (opts.format, opts.prettier, opts.no_format)
226 } else {
227 (false, false, false)
228 };
229 let transform_start = Instant::now();
230 let result = recipe.transform(report, TransformOptions { mode, review, autofix, format, prettier, no_format });
231 let transform_ms = transform_start.elapsed().as_millis() as u64;
232
233 if verbose {
234 println!(
235 "{} detect: {}ms, transform: {}ms",
236 terminal::label("timing"),
237 detect_ms,
238 transform_ms
239 );
240 }
241
242 result.map(|r| (r, transform_ms))
243 }
244 Err(_) => Err(anyhow::anyhow!("Skipping transform due to detect failure")),
245 };
246
247 let mut stage = PipelineStage {
248 recipe_name: metadata.name,
249 detect_result: detect_result.ok(),
250 transform_result: None,
251 timing: StageTiming::default(),
252 };
253 stage.timing.detect_ms = detect_ms;
254
255 match transform_result {
256 Ok((report, transform_ms)) => {
257 stage.timing.transform_ms = transform_ms;
258 stage.timing.total_ms = detect_ms + transform_ms;
259 let files_scanned = stage
260 .detect_result
261 .as_ref()
262 .map(|report| report.total_files)
263 .unwrap_or_default();
264 let skipped_count = stage
265 .detect_result
266 .as_ref()
267 .map(|report| report.skipped_files.len() + report.failed_files.len())
268 .unwrap_or_default()
269 + report.skipped_files.len();
270
271 self.events.push(ExecutionEvent::StageCompleted {
272 recipe: metadata.name.to_string(),
273 duration_ms: stage.timing.total_ms,
274 });
275
276 all_changed_files.extend(report.changed_files.clone());
277 for skipped in &report.skipped_files {
278 all_skipped_files.push(skipped.path.clone());
279 }
280 for unsupported in &report.unsupported_patterns {
281 all_unsupported
282 .push((unsupported.path.clone(), unsupported.patterns.clone()));
283 }
284
285 for path in &report.changed_files {
286 self.context.mark_stage_passed(path, metadata.name);
287 }
288
289 if autofix && mode == TransformMode::Write {
290 for path in &report.changed_files {
291 let _ = crate::core::ast::cleanup::run_autofix(path);
292 }
293 }
294
295 println!(
296 "{} scanned: {}, modified: {}, skipped: {}",
297 terminal::label("stage summary"),
298 files_scanned,
299 report.changed_files.len(),
300 skipped_count
301 );
302 stage_inputs = Some(report.changed_files.clone());
303 stage.transform_result = Some(report);
304 }
305 Err(e) => {
306 failed_stages.push(metadata.name.to_string());
307 self.events.push(ExecutionEvent::StageFailed {
308 recipe: metadata.name.to_string(),
309 error: e.to_string(),
310 });
311
312 eprintln!(
313 "{} Transform failed for {}: {}",
314 terminal::warning_prefix(),
315 metadata.name,
316 e
317 );
318 }
319 }
320
321 self.stages.push(stage);
322
323 completed_recipes.push(metadata.name.to_string());
324
325 if let (Some(sid), Some(opts)) = (&self.session_id, &self.session_options) {
326 let remaining_recipes: Vec<String> = ordered.iter()
327 .skip(completed_recipes.len())
328 .map(|r| r.metadata().name.to_string())
329 .collect();
330
331 let checkpoint_id = format!("{}-stage-{}", sid, completed_recipes.len());
332 let checkpoint = crate::core::session::MigrationCheckpoint {
333 id: checkpoint_id.clone(),
334 session_id: sid.clone(),
335 completed_recipes: completed_recipes.clone(),
336 remaining_recipes,
337 modified_files: all_changed_files.clone(),
338 options: opts.clone(),
339 target_path: root.to_path_buf(),
340 timestamp: crate::core::session::current_timestamp(),
341 };
342
343 let store = crate::core::session::CheckpointStore::new(&self.context.project_root);
344 if let Err(e) = store.save(&checkpoint) {
345 eprintln!("Warning: Failed to save checkpoint: {}", e);
346 } else {
347 println!(
348 "{} Checkpoint created: {}",
349 terminal::success_prefix(),
350 checkpoint_id
351 );
352 }
353 }
354 }
355
356 let mut diff_report = TransformationReport::new();
357 for path in &all_changed_files {
358 diff_report
359 .changed_files
360 .push(crate::core::diff::preview::ChangedFile {
361 path: path.clone(),
362 lines_added: 1,
363 lines_removed: 1,
364 preview: None,
365 });
366 }
367 for path in &all_skipped_files {
368 diff_report
369 .skipped_files
370 .push(crate::core::diff::preview::SkippedFile {
371 path: path.clone(),
372 reason: SkipReason::NoChanges,
373 });
374 }
375 diff_report.finish();
376
377 let renderer = DiffRenderer::new(crate::core::diff::preview::PreviewConfig {
378 max_lines: 100,
379 show_line_numbers: true,
380 summary_only: false,
381 verbose,
382 });
383
384 if mode == TransformMode::DryRun && !diff_report.changed_files.is_empty() {
385 println!();
386 println!("{}", terminal::label("Transform Preview"));
387 renderer.render_report(&diff_report);
388 }
389
390 Ok(PipelineSummary {
391 stages_executed: ordered.len(),
392 stages_failed: failed_stages,
393 total_files_processed: total_files.load(Ordering::SeqCst),
394 total_changed_files: all_changed_files.len(),
395 total_skipped_files: all_skipped_files.len(),
396 total_unsupported: all_unsupported.len(),
397 modified_files: all_changed_files,
398 timings,
399 incompatibilities: hard_failures.into_iter().cloned().collect(),
400 events: self.events,
401 stages: self.stages,
402 })
403 }
404
405 #[allow(dead_code)]
406 pub fn get_context(&self) -> &PipelineContext {
407 &self.context
408 }
409
410 #[allow(dead_code)]
411 pub fn get_events(&self) -> &[ExecutionEvent] {
412 &self.events
413 }
414}
415
416fn format_validation_error(issue: &IncompatibleRecipe) -> String {
417 match issue.reason {
418 IncompatibilityReason::DuplicateRecipe => {
419 format!("duplicate recipe `{}`", issue.recipe_a)
420 }
421 IncompatibilityReason::IncompatibleRecipe => {
422 format!(
423 "`{}` is incompatible with `{}`",
424 issue.recipe_a, issue.recipe_b
425 )
426 }
427 IncompatibilityReason::MissingRequiredRecipe => {
428 format!(
429 "`{}` requires `{}` to run first",
430 issue.recipe_a, issue.recipe_b
431 )
432 }
433 IncompatibilityReason::RequiredRecipeOutOfOrder => {
434 format!(
435 "`{}` must run after required recipe `{}`",
436 issue.recipe_a, issue.recipe_b
437 )
438 }
439 IncompatibilityReason::OrderingHintViolation => {
440 format!(
441 "`{}` ordering hint: should run before `{}`",
442 issue.recipe_a, issue.recipe_b
443 )
444 }
445 }
446}
447
448fn detect_inputs(
449 recipe: &dyn Recipe,
450 inputs: &[PathBuf],
451 progress: &indicatif::ProgressBar,
452) -> Result<DetectionReport> {
453 let mut merged = DetectionReport::default();
454
455 for input in inputs {
456 if !input.exists() {
457 continue;
458 }
459
460 let report = recipe.detect(input, progress)?;
461 merged.total_files += report.total_files;
462 merged.parseable_files += report.parseable_files;
463 merged.analyses.extend(report.analyses);
464 merged.skipped_files.extend(report.skipped_files);
465 merged.failed_files.extend(report.failed_files);
466 }
467
468 Ok(merged)
469}
470
471#[derive(Debug)]
472#[allow(dead_code)]
473pub struct PipelineSummary {
474 pub stages_executed: usize,
475 pub stages_failed: Vec<String>,
476 pub total_files_processed: usize,
477 pub total_changed_files: usize,
478 pub total_skipped_files: usize,
479 pub total_unsupported: usize,
480 pub modified_files: Vec<PathBuf>,
481 pub timings: Vec<(String, u64)>,
482 pub incompatibilities: Vec<IncompatibleRecipe>,
483 pub events: Vec<ExecutionEvent>,
484 pub stages: Vec<PipelineStage>,
485}
486
487impl PipelineSummary {
488 pub fn print_summary(&self) {
489 println!();
490 println!("{}", terminal::label("═".repeat(50).as_str()));
491 println!("Pipeline Summary");
492 println!("{}", terminal::label("─".repeat(50).as_str()));
493
494 println!(
495 "{}: {} stages executed",
496 terminal::label("stages"),
497 self.stages_executed
498 );
499
500 if !self.stages_failed.is_empty() {
501 println!(
502 "{}: {}",
503 terminal::label("failed stages"),
504 self.stages_failed.join(", ")
505 );
506 }
507
508 println!(
509 "{}: {} files processed",
510 terminal::label("files"),
511 self.total_files_processed
512 );
513 println!(
514 "{}: {} changed, {} skipped, {} unsupported",
515 terminal::label("results"),
516 self.total_changed_files,
517 self.total_skipped_files,
518 self.total_unsupported
519 );
520
521 if !self.timings.is_empty() {
522 println!();
523 println!("{}", terminal::label("Timing"));
524 for (recipe, ms) in &self.timings {
525 println!(" {}: {}ms", recipe, ms);
526 }
527 }
528
529 let cache_stats = crate::core::cache::stats();
530 let reused = cache_stats.hits;
531 let skipped_reparses = reused;
532 let time_saved_ms = reused * 25;
533
534 println!();
535 println!("{}", terminal::label("Performance & Caching Summary"));
536 println!(
537 " {}: {}",
538 terminal::label("cached files reused"),
539 reused
540 );
541 println!(
542 " {}: {}",
543 terminal::label("skipped reparses"),
544 skipped_reparses
545 );
546 println!(
547 " {}: {}ms",
548 terminal::label("est time savings"),
549 time_saved_ms
550 );
551 }
552}
553
554#[cfg(test)]
555mod tests {
556 use super::*;
557 use std::path::PathBuf;
558
559 #[test]
560 fn test_recipe_orderer() {
561 let _orderer = RecipeOrderer::new().add_dependency("b", "c");
562 assert!(true);
563 }
564
565 #[test]
566 fn test_pipeline_context_new() {
567 let ctx = PipelineContext::new(PathBuf::from("/tmp"));
568 assert!(ctx.files.is_empty());
569 }
570
571 #[test]
572 fn test_pipeline_summary_print() {
573 let summary = PipelineSummary {
574 stages_executed: 2,
575 stages_failed: vec!["bad-recipe".to_string()],
576 total_files_processed: 10,
577 total_changed_files: 5,
578 total_skipped_files: 3,
579 total_unsupported: 2,
580 modified_files: vec![PathBuf::from("a.js")],
581 timings: vec![("a".to_string(), 100), ("b".to_string(), 200)],
582 incompatibilities: vec![],
583 events: vec![],
584 stages: vec![],
585 };
586 assert_eq!(summary.stages_executed, 2);
587 }
588}