Skip to main content

morph_cli/core/pipeline/
executor.rs

1use std::path::{Path, PathBuf};
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Instant;
4
5use anyhow::{Result, bail};
6use indicatif::MultiProgress;
7
8use crate::core::diff::preview::{SkipReason, TransformationReport};
9use crate::core::diff::renderer::DiffRenderer;
10use crate::core::pipeline::context::{
11    ExecutionEvent, IncompatibleRecipe, PipelineContext, PipelineStage, RecipeOrderer, StageTiming,
12    IncompatibilityReason, validate_recipe_order,
13};
14use crate::core::recipe::{DetectionReport, Recipe, TransformMode, TransformOptions};
15use crate::utils::terminal;
16
17pub struct PipelineExecutor {
18    stages: Vec<PipelineStage>,
19    recipe_names: Vec<String>,
20    context: PipelineContext,
21    events: Vec<ExecutionEvent>,
22    orderer: RecipeOrderer,
23    pub max_files: usize,
24    pub max_duration_seconds: u64,
25    pub tag_filter: Option<String>,
26    pub session_id: Option<String>,
27    pub session_options: Option<crate::core::session::SessionOptions>,
28}
29
30impl PipelineExecutor {
31    pub fn new(project_root: PathBuf) -> Self {
32        Self {
33            stages: Vec::new(),
34            recipe_names: Vec::new(),
35            context: PipelineContext::new(project_root),
36            events: Vec::new(),
37            orderer: RecipeOrderer::new(),
38            max_files: 10000,
39            max_duration_seconds: 300,
40            tag_filter: None,
41            session_id: None,
42            session_options: None,
43        }
44    }
45
46    pub fn with_limits(mut self, max_files: usize, max_duration_seconds: u64) -> Self {
47        self.max_files = max_files;
48        self.max_duration_seconds = max_duration_seconds;
49        self
50    }
51
52    pub fn with_tag(mut self, tag: Option<String>) -> Self {
53        self.tag_filter = tag;
54        self
55    }
56
57    pub fn with_session(mut self, session_id: String, options: crate::core::session::SessionOptions) -> Self {
58        self.session_id = Some(session_id);
59        self.session_options = Some(options);
60        self
61    }
62
63    #[allow(dead_code)]
64    pub fn with_ordering(mut self, orderer: RecipeOrderer) -> Self {
65        self.orderer = orderer;
66        self
67    }
68
69    pub fn add_recipe(mut self, recipe_name: &str) -> Self {
70        self.recipe_names.push(recipe_name.to_string());
71        self
72    }
73
74    pub fn execute(
75        mut self,
76        root: &Path,
77        mode: TransformMode,
78        review: bool,
79        autofix: bool,
80        registry: &crate::core::registry::RecipeRegistry,
81        verbose: bool,
82    ) -> Result<PipelineSummary> {
83        let recipe_refs: Vec<_> = self
84            .recipe_names
85            .iter()
86            .filter_map(|name| registry.find(name))
87            .collect();
88
89        let all_issues = validate_recipe_order(&recipe_refs);
90
91        // Separate hard failures from soft ordering-hint warnings.
92        let hard_failures: Vec<_> = all_issues
93            .iter()
94            .filter(|i| i.reason != IncompatibilityReason::OrderingHintViolation)
95            .collect();
96        let ordering_warnings: Vec<_> = all_issues
97            .iter()
98            .filter(|i| i.reason == IncompatibilityReason::OrderingHintViolation)
99            .collect();
100
101        if !hard_failures.is_empty() {
102            for inc in &hard_failures {
103                eprintln!(
104                    "{} {}",
105                    terminal::warning_prefix(),
106                    format_validation_error(inc)
107                );
108            }
109            bail!("Recipe compatibility validation failed");
110        }
111
112        // Emit soft warnings for ordering hint violations (non-fatal).
113        for inc in &ordering_warnings {
114            eprintln!(
115                "{} Ordering hint: {} should run before {} (hint from recipe metadata).",
116                terminal::warning_prefix(),
117                inc.recipe_b,
118                inc.recipe_a,
119            );
120        }
121
122        // Auto-build orderer from recipe metadata hints and merge with any manually set orderer.
123        let metadata_orderer = RecipeOrderer::from_metadata(&recipe_refs);
124        for dep in metadata_orderer.dependencies {
125            self.orderer.dependencies.push(dep);
126        }
127
128        let ordered = self.orderer.order(&recipe_refs);
129
130        let total_files = std::sync::Arc::new(AtomicUsize::new(0));
131        let mut all_changed_files: Vec<PathBuf> = Vec::new();
132        let mut all_skipped_files: Vec<PathBuf> = Vec::new();
133        let mut all_unsupported: Vec<(PathBuf, Vec<String>)> = Vec::new();
134        let mut failed_stages: Vec<String> = Vec::new();
135        let mut timings: Vec<(String, u64)> = Vec::new();
136        let mut stage_inputs: Option<Vec<PathBuf>> = None;
137        let mut completed_recipes: Vec<String> = Vec::new();
138
139        let _multi = MultiProgress::new();
140        let pipeline_start = Instant::now();
141
142        for recipe in &ordered {
143            if pipeline_start.elapsed().as_secs() > self.max_duration_seconds {
144                println!(
145                    "{} {}",
146                    terminal::warning_prefix(),
147                    terminal::explain_skip("Global execution timeout exceeded")
148                );
149                self.events.push(ExecutionEvent::StageSkipped {
150                    recipe: recipe.metadata().name.to_string(),
151                    reason: "timeout".to_string(),
152                });
153                break;
154            }
155
156            if total_files.load(Ordering::Relaxed) >= self.max_files {
157                println!(
158                    "{} {}",
159                    terminal::warning_prefix(),
160                    terminal::explain_skip("Global file limit reached")
161                );
162                self.events.push(ExecutionEvent::StageSkipped {
163                    recipe: recipe.metadata().name.to_string(),
164                    reason: "file limit".to_string(),
165                });
166                break;
167            }
168
169            let metadata = recipe.metadata();
170
171            println!();
172            println!("{}", terminal::label("═".repeat(50).as_str()));
173            println!("Stage: {}", terminal::label(metadata.name));
174            println!("{}", metadata.description);
175            println!("{}", terminal::label("─".repeat(50).as_str()));
176
177            self.events.push(ExecutionEvent::StageStarted {
178                recipe: metadata.name.to_string(),
179            });
180
181            let detect_start = Instant::now();
182            let input_label = stage_inputs
183                .as_ref()
184                .map(|inputs| format!("{} modified file(s)", inputs.len()))
185                .unwrap_or_else(|| root.display().to_string());
186            let spinner = terminal::spinner(&format!("Scanning {input_label} with {}", metadata.name));
187
188            let mut detect_result = match stage_inputs.as_ref() {
189                Some(inputs) => detect_inputs(*recipe, inputs, &spinner),
190                None => recipe.detect(root, &spinner),
191            };
192            spinner.finish_and_clear();
193
194            if let Ok(ref mut report) = detect_result {
195                for analysis in &mut report.analyses {
196                    let is_risky = analysis.classification == crate::core::recipe::FileClassification::Risky;
197                    analysis.tags = crate::core::recipe::compute_tags_for_file(&analysis.path, None, &analysis.detected_patterns, is_risky, false);
198                }
199                if let Some(ref tag) = self.tag_filter {
200                    report.analyses.retain(|analysis| analysis.tags.iter().any(|t| t == tag));
201                }
202            }
203
204            if let Err(e) = &detect_result {
205                    eprintln!(
206                        "{} Detect failed for {}: {}",
207                        terminal::warning_prefix(),
208                        metadata.name,
209                        e
210                    );
211            }
212
213            let detect_ms = detect_start.elapsed().as_millis() as u64;
214            timings.push((metadata.name.to_string(), detect_ms));
215
216            let transform_result: Result<_> = match &detect_result {
217                Ok(report) => {
218                    for analysis in &report.analyses {
219                        self.context
220                            .add_file(analysis.path.clone(), analysis.clone());
221                        total_files.fetch_add(1, Ordering::SeqCst);
222                    }
223
224                    let (format, prettier, no_format) = if let Some(ref opts) = self.session_options {
225                        (opts.format, opts.prettier, opts.no_format)
226                    } else {
227                        (false, false, false)
228                    };
229                    let transform_start = Instant::now();
230                    let result = recipe.transform(report, TransformOptions { mode, review, autofix, format, prettier, no_format });
231                    let transform_ms = transform_start.elapsed().as_millis() as u64;
232
233                    if verbose {
234                        println!(
235                            "{} detect: {}ms, transform: {}ms",
236                            terminal::label("timing"),
237                            detect_ms,
238                            transform_ms
239                        );
240                    }
241
242                    result.map(|r| (r, transform_ms))
243                }
244                Err(_) => Err(anyhow::anyhow!("Skipping transform due to detect failure")),
245            };
246
247            let mut stage = PipelineStage {
248                recipe_name: metadata.name,
249                detect_result: detect_result.ok(),
250                transform_result: None,
251                timing: StageTiming::default(),
252            };
253            stage.timing.detect_ms = detect_ms;
254
255            match transform_result {
256                Ok((report, transform_ms)) => {
257                    stage.timing.transform_ms = transform_ms;
258                    stage.timing.total_ms = detect_ms + transform_ms;
259                    let files_scanned = stage
260                        .detect_result
261                        .as_ref()
262                        .map(|report| report.total_files)
263                        .unwrap_or_default();
264                    let skipped_count = stage
265                        .detect_result
266                        .as_ref()
267                        .map(|report| report.skipped_files.len() + report.failed_files.len())
268                        .unwrap_or_default()
269                        + report.skipped_files.len();
270
271                    self.events.push(ExecutionEvent::StageCompleted {
272                        recipe: metadata.name.to_string(),
273                        duration_ms: stage.timing.total_ms,
274                    });
275
276                    all_changed_files.extend(report.changed_files.clone());
277                    for skipped in &report.skipped_files {
278                        all_skipped_files.push(skipped.path.clone());
279                    }
280                    for unsupported in &report.unsupported_patterns {
281                        all_unsupported
282                            .push((unsupported.path.clone(), unsupported.patterns.clone()));
283                    }
284
285                    for path in &report.changed_files {
286                        self.context.mark_stage_passed(path, metadata.name);
287                    }
288
289                    if autofix && mode == TransformMode::Write {
290                        for path in &report.changed_files {
291                            let _ = crate::core::ast::cleanup::run_autofix(path);
292                        }
293                    }
294
295                    println!(
296                        "{} scanned: {}, modified: {}, skipped: {}",
297                        terminal::label("stage summary"),
298                        files_scanned,
299                        report.changed_files.len(),
300                        skipped_count
301                    );
302                    stage_inputs = Some(report.changed_files.clone());
303                    stage.transform_result = Some(report);
304                }
305                Err(e) => {
306                    failed_stages.push(metadata.name.to_string());
307                    self.events.push(ExecutionEvent::StageFailed {
308                        recipe: metadata.name.to_string(),
309                        error: e.to_string(),
310                    });
311
312                    eprintln!(
313                        "{} Transform failed for {}: {}",
314                        terminal::warning_prefix(),
315                        metadata.name,
316                        e
317                    );
318                }
319            }
320
321            self.stages.push(stage);
322
323            completed_recipes.push(metadata.name.to_string());
324
325            if let (Some(sid), Some(opts)) = (&self.session_id, &self.session_options) {
326                let remaining_recipes: Vec<String> = ordered.iter()
327                    .skip(completed_recipes.len())
328                    .map(|r| r.metadata().name.to_string())
329                    .collect();
330
331                let checkpoint_id = format!("{}-stage-{}", sid, completed_recipes.len());
332                let checkpoint = crate::core::session::MigrationCheckpoint {
333                    id: checkpoint_id.clone(),
334                    session_id: sid.clone(),
335                    completed_recipes: completed_recipes.clone(),
336                    remaining_recipes,
337                    modified_files: all_changed_files.clone(),
338                    options: opts.clone(),
339                    target_path: root.to_path_buf(),
340                    timestamp: crate::core::session::current_timestamp(),
341                };
342
343                let store = crate::core::session::CheckpointStore::new(&self.context.project_root);
344                if let Err(e) = store.save(&checkpoint) {
345                    eprintln!("Warning: Failed to save checkpoint: {}", e);
346                } else {
347                    println!(
348                        "{} Checkpoint created: {}",
349                        terminal::success_prefix(),
350                        checkpoint_id
351                    );
352                }
353            }
354        }
355
356        let mut diff_report = TransformationReport::new();
357        for path in &all_changed_files {
358            diff_report
359                .changed_files
360                .push(crate::core::diff::preview::ChangedFile {
361                    path: path.clone(),
362                    lines_added: 1,
363                    lines_removed: 1,
364                    preview: None,
365                });
366        }
367        for path in &all_skipped_files {
368            diff_report
369                .skipped_files
370                .push(crate::core::diff::preview::SkippedFile {
371                    path: path.clone(),
372                    reason: SkipReason::NoChanges,
373                });
374        }
375        diff_report.finish();
376
377        let renderer = DiffRenderer::new(crate::core::diff::preview::PreviewConfig {
378            max_lines: 100,
379            show_line_numbers: true,
380            summary_only: false,
381            verbose,
382        });
383
384        if mode == TransformMode::DryRun && !diff_report.changed_files.is_empty() {
385            println!();
386            println!("{}", terminal::label("Transform Preview"));
387            renderer.render_report(&diff_report);
388        }
389
390        Ok(PipelineSummary {
391            stages_executed: ordered.len(),
392            stages_failed: failed_stages,
393            total_files_processed: total_files.load(Ordering::SeqCst),
394            total_changed_files: all_changed_files.len(),
395            total_skipped_files: all_skipped_files.len(),
396            total_unsupported: all_unsupported.len(),
397            modified_files: all_changed_files,
398            timings,
399            incompatibilities: hard_failures.into_iter().cloned().collect(),
400            events: self.events,
401            stages: self.stages,
402        })
403    }
404
405    #[allow(dead_code)]
406    pub fn get_context(&self) -> &PipelineContext {
407        &self.context
408    }
409
410    #[allow(dead_code)]
411    pub fn get_events(&self) -> &[ExecutionEvent] {
412        &self.events
413    }
414}
415
416fn format_validation_error(issue: &IncompatibleRecipe) -> String {
417    match issue.reason {
418        IncompatibilityReason::DuplicateRecipe => {
419            format!("duplicate recipe `{}`", issue.recipe_a)
420        }
421        IncompatibilityReason::IncompatibleRecipe => {
422            format!(
423                "`{}` is incompatible with `{}`",
424                issue.recipe_a, issue.recipe_b
425            )
426        }
427        IncompatibilityReason::MissingRequiredRecipe => {
428            format!(
429                "`{}` requires `{}` to run first",
430                issue.recipe_a, issue.recipe_b
431            )
432        }
433        IncompatibilityReason::RequiredRecipeOutOfOrder => {
434            format!(
435                "`{}` must run after required recipe `{}`",
436                issue.recipe_a, issue.recipe_b
437            )
438        }
439        IncompatibilityReason::OrderingHintViolation => {
440            format!(
441                "`{}` ordering hint: should run before `{}`",
442                issue.recipe_a, issue.recipe_b
443            )
444        }
445    }
446}
447
448fn detect_inputs(
449    recipe: &dyn Recipe,
450    inputs: &[PathBuf],
451    progress: &indicatif::ProgressBar,
452) -> Result<DetectionReport> {
453    let mut merged = DetectionReport::default();
454
455    for input in inputs {
456        if !input.exists() {
457            continue;
458        }
459
460        let report = recipe.detect(input, progress)?;
461        merged.total_files += report.total_files;
462        merged.parseable_files += report.parseable_files;
463        merged.analyses.extend(report.analyses);
464        merged.skipped_files.extend(report.skipped_files);
465        merged.failed_files.extend(report.failed_files);
466    }
467
468    Ok(merged)
469}
470
471#[derive(Debug)]
472#[allow(dead_code)]
473pub struct PipelineSummary {
474    pub stages_executed: usize,
475    pub stages_failed: Vec<String>,
476    pub total_files_processed: usize,
477    pub total_changed_files: usize,
478    pub total_skipped_files: usize,
479    pub total_unsupported: usize,
480    pub modified_files: Vec<PathBuf>,
481    pub timings: Vec<(String, u64)>,
482    pub incompatibilities: Vec<IncompatibleRecipe>,
483    pub events: Vec<ExecutionEvent>,
484    pub stages: Vec<PipelineStage>,
485}
486
487impl PipelineSummary {
488    pub fn print_summary(&self) {
489        println!();
490        println!("{}", terminal::label("═".repeat(50).as_str()));
491        println!("Pipeline Summary");
492        println!("{}", terminal::label("─".repeat(50).as_str()));
493
494        println!(
495            "{}: {} stages executed",
496            terminal::label("stages"),
497            self.stages_executed
498        );
499
500        if !self.stages_failed.is_empty() {
501            println!(
502                "{}: {}",
503                terminal::label("failed stages"),
504                self.stages_failed.join(", ")
505            );
506        }
507
508        println!(
509            "{}: {} files processed",
510            terminal::label("files"),
511            self.total_files_processed
512        );
513        println!(
514            "{}: {} changed, {} skipped, {} unsupported",
515            terminal::label("results"),
516            self.total_changed_files,
517            self.total_skipped_files,
518            self.total_unsupported
519        );
520
521        if !self.timings.is_empty() {
522            println!();
523            println!("{}", terminal::label("Timing"));
524            for (recipe, ms) in &self.timings {
525                println!("  {}: {}ms", recipe, ms);
526            }
527        }
528
529        let cache_stats = crate::core::cache::stats();
530        let reused = cache_stats.hits;
531        let skipped_reparses = reused;
532        let time_saved_ms = reused * 25;
533
534        println!();
535        println!("{}", terminal::label("Performance & Caching Summary"));
536        println!(
537            "  {}:  {}",
538            terminal::label("cached files reused"),
539            reused
540        );
541        println!(
542            "  {}:     {}",
543            terminal::label("skipped reparses"),
544            skipped_reparses
545        );
546        println!(
547            "  {}:   {}ms",
548            terminal::label("est time savings"),
549            time_saved_ms
550        );
551    }
552}
553
554#[cfg(test)]
555mod tests {
556    use super::*;
557    use std::path::PathBuf;
558
559    #[test]
560    fn test_recipe_orderer() {
561        let _orderer = RecipeOrderer::new().add_dependency("b", "c");
562        assert!(true);
563    }
564
565    #[test]
566    fn test_pipeline_context_new() {
567        let ctx = PipelineContext::new(PathBuf::from("/tmp"));
568        assert!(ctx.files.is_empty());
569    }
570
571    #[test]
572    fn test_pipeline_summary_print() {
573        let summary = PipelineSummary {
574            stages_executed: 2,
575            stages_failed: vec!["bad-recipe".to_string()],
576            total_files_processed: 10,
577            total_changed_files: 5,
578            total_skipped_files: 3,
579            total_unsupported: 2,
580            modified_files: vec![PathBuf::from("a.js")],
581            timings: vec![("a".to_string(), 100), ("b".to_string(), 200)],
582            incompatibilities: vec![],
583            events: vec![],
584            stages: vec![],
585        };
586        assert_eq!(summary.stages_executed, 2);
587    }
588}