use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use anyhow::{Result, bail};
use indicatif::MultiProgress;
use crate::core::diff::preview::{SkipReason, TransformationReport};
use crate::core::diff::renderer::DiffRenderer;
use crate::core::pipeline::context::{
ExecutionEvent, IncompatibleRecipe, PipelineContext, PipelineStage, RecipeOrderer, StageTiming,
IncompatibilityReason, validate_recipe_order,
};
use crate::core::recipe::{DetectionReport, Recipe, TransformMode, TransformOptions};
use crate::utils::terminal;
pub struct PipelineExecutor {
stages: Vec<PipelineStage>,
recipe_names: Vec<String>,
context: PipelineContext,
events: Vec<ExecutionEvent>,
orderer: RecipeOrderer,
pub max_files: usize,
pub max_duration_seconds: u64,
pub tag_filter: Option<String>,
pub session_id: Option<String>,
pub session_options: Option<crate::core::session::SessionOptions>,
}
impl PipelineExecutor {
pub fn new(project_root: PathBuf) -> Self {
Self {
stages: Vec::new(),
recipe_names: Vec::new(),
context: PipelineContext::new(project_root),
events: Vec::new(),
orderer: RecipeOrderer::new(),
max_files: 10000,
max_duration_seconds: 300,
tag_filter: None,
session_id: None,
session_options: None,
}
}
pub fn with_limits(mut self, max_files: usize, max_duration_seconds: u64) -> Self {
self.max_files = max_files;
self.max_duration_seconds = max_duration_seconds;
self
}
pub fn with_tag(mut self, tag: Option<String>) -> Self {
self.tag_filter = tag;
self
}
pub fn with_session(mut self, session_id: String, options: crate::core::session::SessionOptions) -> Self {
self.session_id = Some(session_id);
self.session_options = Some(options);
self
}
#[allow(dead_code)]
pub fn with_ordering(mut self, orderer: RecipeOrderer) -> Self {
self.orderer = orderer;
self
}
pub fn add_recipe(mut self, recipe_name: &str) -> Self {
self.recipe_names.push(recipe_name.to_string());
self
}
pub fn execute(
mut self,
root: &Path,
mode: TransformMode,
review: bool,
autofix: bool,
registry: &crate::core::registry::RecipeRegistry,
verbose: bool,
) -> Result<PipelineSummary> {
let recipe_refs: Vec<_> = self
.recipe_names
.iter()
.filter_map(|name| registry.find(name))
.collect();
let all_issues = validate_recipe_order(&recipe_refs);
let hard_failures: Vec<_> = all_issues
.iter()
.filter(|i| i.reason != IncompatibilityReason::OrderingHintViolation)
.collect();
let ordering_warnings: Vec<_> = all_issues
.iter()
.filter(|i| i.reason == IncompatibilityReason::OrderingHintViolation)
.collect();
if !hard_failures.is_empty() {
for inc in &hard_failures {
eprintln!(
"{} {}",
terminal::warning_prefix(),
format_validation_error(inc)
);
}
bail!("Recipe compatibility validation failed");
}
for inc in &ordering_warnings {
eprintln!(
"{} Ordering hint: {} should run before {} (hint from recipe metadata).",
terminal::warning_prefix(),
inc.recipe_b,
inc.recipe_a,
);
}
let metadata_orderer = RecipeOrderer::from_metadata(&recipe_refs);
for dep in metadata_orderer.dependencies {
self.orderer.dependencies.push(dep);
}
let ordered = self.orderer.order(&recipe_refs);
let total_files = std::sync::Arc::new(AtomicUsize::new(0));
let mut all_changed_files: Vec<PathBuf> = Vec::new();
let mut all_skipped_files: Vec<PathBuf> = Vec::new();
let mut all_unsupported: Vec<(PathBuf, Vec<String>)> = Vec::new();
let mut failed_stages: Vec<String> = Vec::new();
let mut timings: Vec<(String, u64)> = Vec::new();
let mut stage_inputs: Option<Vec<PathBuf>> = None;
let mut completed_recipes: Vec<String> = Vec::new();
let _multi = MultiProgress::new();
let pipeline_start = Instant::now();
for recipe in &ordered {
if pipeline_start.elapsed().as_secs() > self.max_duration_seconds {
println!(
"{} {}",
terminal::warning_prefix(),
terminal::explain_skip("Global execution timeout exceeded")
);
self.events.push(ExecutionEvent::StageSkipped {
recipe: recipe.metadata().name.to_string(),
reason: "timeout".to_string(),
});
break;
}
if total_files.load(Ordering::Relaxed) >= self.max_files {
println!(
"{} {}",
terminal::warning_prefix(),
terminal::explain_skip("Global file limit reached")
);
self.events.push(ExecutionEvent::StageSkipped {
recipe: recipe.metadata().name.to_string(),
reason: "file limit".to_string(),
});
break;
}
let metadata = recipe.metadata();
println!();
println!("{}", terminal::label("═".repeat(50).as_str()));
println!("Stage: {}", terminal::label(metadata.name));
println!("{}", metadata.description);
println!("{}", terminal::label("─".repeat(50).as_str()));
self.events.push(ExecutionEvent::StageStarted {
recipe: metadata.name.to_string(),
});
let detect_start = Instant::now();
let input_label = stage_inputs
.as_ref()
.map(|inputs| format!("{} modified file(s)", inputs.len()))
.unwrap_or_else(|| root.display().to_string());
let spinner = terminal::spinner(&format!("Scanning {input_label} with {}", metadata.name));
let mut detect_result = match stage_inputs.as_ref() {
Some(inputs) => detect_inputs(*recipe, inputs, &spinner),
None => recipe.detect(root, &spinner),
};
spinner.finish_and_clear();
if let Ok(ref mut report) = detect_result {
for analysis in &mut report.analyses {
let is_risky = analysis.classification == crate::core::recipe::FileClassification::Risky;
analysis.tags = crate::core::recipe::compute_tags_for_file(&analysis.path, None, &analysis.detected_patterns, is_risky, false);
}
if let Some(ref tag) = self.tag_filter {
report.analyses.retain(|analysis| analysis.tags.iter().any(|t| t == tag));
}
}
if let Err(e) = &detect_result {
eprintln!(
"{} Detect failed for {}: {}",
terminal::warning_prefix(),
metadata.name,
e
);
}
let detect_ms = detect_start.elapsed().as_millis() as u64;
timings.push((metadata.name.to_string(), detect_ms));
let transform_result: Result<_> = match &detect_result {
Ok(report) => {
for analysis in &report.analyses {
self.context
.add_file(analysis.path.clone(), analysis.clone());
total_files.fetch_add(1, Ordering::SeqCst);
}
let (format, prettier, no_format) = if let Some(ref opts) = self.session_options {
(opts.format, opts.prettier, opts.no_format)
} else {
(false, false, false)
};
let transform_start = Instant::now();
let result = recipe.transform(report, TransformOptions { mode, review, autofix, format, prettier, no_format });
let transform_ms = transform_start.elapsed().as_millis() as u64;
if verbose {
println!(
"{} detect: {}ms, transform: {}ms",
terminal::label("timing"),
detect_ms,
transform_ms
);
}
result.map(|r| (r, transform_ms))
}
Err(_) => Err(anyhow::anyhow!("Skipping transform due to detect failure")),
};
let mut stage = PipelineStage {
recipe_name: metadata.name,
detect_result: detect_result.ok(),
transform_result: None,
timing: StageTiming::default(),
};
stage.timing.detect_ms = detect_ms;
match transform_result {
Ok((report, transform_ms)) => {
stage.timing.transform_ms = transform_ms;
stage.timing.total_ms = detect_ms + transform_ms;
let files_scanned = stage
.detect_result
.as_ref()
.map(|report| report.total_files)
.unwrap_or_default();
let skipped_count = stage
.detect_result
.as_ref()
.map(|report| report.skipped_files.len() + report.failed_files.len())
.unwrap_or_default()
+ report.skipped_files.len();
self.events.push(ExecutionEvent::StageCompleted {
recipe: metadata.name.to_string(),
duration_ms: stage.timing.total_ms,
});
all_changed_files.extend(report.changed_files.clone());
for skipped in &report.skipped_files {
all_skipped_files.push(skipped.path.clone());
}
for unsupported in &report.unsupported_patterns {
all_unsupported
.push((unsupported.path.clone(), unsupported.patterns.clone()));
}
for path in &report.changed_files {
self.context.mark_stage_passed(path, metadata.name);
}
if autofix && mode == TransformMode::Write {
for path in &report.changed_files {
let _ = crate::core::ast::cleanup::run_autofix(path);
}
}
println!(
"{} scanned: {}, modified: {}, skipped: {}",
terminal::label("stage summary"),
files_scanned,
report.changed_files.len(),
skipped_count
);
stage_inputs = Some(report.changed_files.clone());
stage.transform_result = Some(report);
}
Err(e) => {
failed_stages.push(metadata.name.to_string());
self.events.push(ExecutionEvent::StageFailed {
recipe: metadata.name.to_string(),
error: e.to_string(),
});
eprintln!(
"{} Transform failed for {}: {}",
terminal::warning_prefix(),
metadata.name,
e
);
}
}
self.stages.push(stage);
completed_recipes.push(metadata.name.to_string());
if let (Some(sid), Some(opts)) = (&self.session_id, &self.session_options) {
let remaining_recipes: Vec<String> = ordered.iter()
.skip(completed_recipes.len())
.map(|r| r.metadata().name.to_string())
.collect();
let checkpoint_id = format!("{}-stage-{}", sid, completed_recipes.len());
let checkpoint = crate::core::session::MigrationCheckpoint {
id: checkpoint_id.clone(),
session_id: sid.clone(),
completed_recipes: completed_recipes.clone(),
remaining_recipes,
modified_files: all_changed_files.clone(),
options: opts.clone(),
target_path: root.to_path_buf(),
timestamp: crate::core::session::current_timestamp(),
};
let store = crate::core::session::CheckpointStore::new(&self.context.project_root);
if let Err(e) = store.save(&checkpoint) {
eprintln!("Warning: Failed to save checkpoint: {}", e);
} else {
println!(
"{} Checkpoint created: {}",
terminal::success_prefix(),
checkpoint_id
);
}
}
}
let mut diff_report = TransformationReport::new();
for path in &all_changed_files {
diff_report
.changed_files
.push(crate::core::diff::preview::ChangedFile {
path: path.clone(),
lines_added: 1,
lines_removed: 1,
preview: None,
});
}
for path in &all_skipped_files {
diff_report
.skipped_files
.push(crate::core::diff::preview::SkippedFile {
path: path.clone(),
reason: SkipReason::NoChanges,
});
}
diff_report.finish();
let renderer = DiffRenderer::new(crate::core::diff::preview::PreviewConfig {
max_lines: 100,
show_line_numbers: true,
summary_only: false,
verbose,
});
if mode == TransformMode::DryRun && !diff_report.changed_files.is_empty() {
println!();
println!("{}", terminal::label("Transform Preview"));
renderer.render_report(&diff_report);
}
Ok(PipelineSummary {
stages_executed: ordered.len(),
stages_failed: failed_stages,
total_files_processed: total_files.load(Ordering::SeqCst),
total_changed_files: all_changed_files.len(),
total_skipped_files: all_skipped_files.len(),
total_unsupported: all_unsupported.len(),
modified_files: all_changed_files,
timings,
incompatibilities: hard_failures.into_iter().cloned().collect(),
events: self.events,
stages: self.stages,
})
}
#[allow(dead_code)]
pub fn get_context(&self) -> &PipelineContext {
&self.context
}
#[allow(dead_code)]
pub fn get_events(&self) -> &[ExecutionEvent] {
&self.events
}
}
fn format_validation_error(issue: &IncompatibleRecipe) -> String {
match issue.reason {
IncompatibilityReason::DuplicateRecipe => {
format!("duplicate recipe `{}`", issue.recipe_a)
}
IncompatibilityReason::IncompatibleRecipe => {
format!(
"`{}` is incompatible with `{}`",
issue.recipe_a, issue.recipe_b
)
}
IncompatibilityReason::MissingRequiredRecipe => {
format!(
"`{}` requires `{}` to run first",
issue.recipe_a, issue.recipe_b
)
}
IncompatibilityReason::RequiredRecipeOutOfOrder => {
format!(
"`{}` must run after required recipe `{}`",
issue.recipe_a, issue.recipe_b
)
}
IncompatibilityReason::OrderingHintViolation => {
format!(
"`{}` ordering hint: should run before `{}`",
issue.recipe_a, issue.recipe_b
)
}
}
}
fn detect_inputs(
recipe: &dyn Recipe,
inputs: &[PathBuf],
progress: &indicatif::ProgressBar,
) -> Result<DetectionReport> {
let mut merged = DetectionReport::default();
for input in inputs {
if !input.exists() {
continue;
}
let report = recipe.detect(input, progress)?;
merged.total_files += report.total_files;
merged.parseable_files += report.parseable_files;
merged.analyses.extend(report.analyses);
merged.skipped_files.extend(report.skipped_files);
merged.failed_files.extend(report.failed_files);
}
Ok(merged)
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct PipelineSummary {
pub stages_executed: usize,
pub stages_failed: Vec<String>,
pub total_files_processed: usize,
pub total_changed_files: usize,
pub total_skipped_files: usize,
pub total_unsupported: usize,
pub modified_files: Vec<PathBuf>,
pub timings: Vec<(String, u64)>,
pub incompatibilities: Vec<IncompatibleRecipe>,
pub events: Vec<ExecutionEvent>,
pub stages: Vec<PipelineStage>,
}
impl PipelineSummary {
pub fn print_summary(&self) {
println!();
println!("{}", terminal::label("═".repeat(50).as_str()));
println!("Pipeline Summary");
println!("{}", terminal::label("─".repeat(50).as_str()));
println!(
"{}: {} stages executed",
terminal::label("stages"),
self.stages_executed
);
if !self.stages_failed.is_empty() {
println!(
"{}: {}",
terminal::label("failed stages"),
self.stages_failed.join(", ")
);
}
println!(
"{}: {} files processed",
terminal::label("files"),
self.total_files_processed
);
println!(
"{}: {} changed, {} skipped, {} unsupported",
terminal::label("results"),
self.total_changed_files,
self.total_skipped_files,
self.total_unsupported
);
if !self.timings.is_empty() {
println!();
println!("{}", terminal::label("Timing"));
for (recipe, ms) in &self.timings {
println!(" {}: {}ms", recipe, ms);
}
}
let cache_stats = crate::core::cache::stats();
let reused = cache_stats.hits;
let skipped_reparses = reused;
let time_saved_ms = reused * 25;
println!();
println!("{}", terminal::label("Performance & Caching Summary"));
println!(
" {}: {}",
terminal::label("cached files reused"),
reused
);
println!(
" {}: {}",
terminal::label("skipped reparses"),
skipped_reparses
);
println!(
" {}: {}ms",
terminal::label("est time savings"),
time_saved_ms
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn test_recipe_orderer() {
let _orderer = RecipeOrderer::new().add_dependency("b", "c");
assert!(true);
}
#[test]
fn test_pipeline_context_new() {
let ctx = PipelineContext::new(PathBuf::from("/tmp"));
assert!(ctx.files.is_empty());
}
#[test]
fn test_pipeline_summary_print() {
let summary = PipelineSummary {
stages_executed: 2,
stages_failed: vec!["bad-recipe".to_string()],
total_files_processed: 10,
total_changed_files: 5,
total_skipped_files: 3,
total_unsupported: 2,
modified_files: vec![PathBuf::from("a.js")],
timings: vec![("a".to_string(), 100), ("b".to_string(), 200)],
incompatibilities: vec![],
events: vec![],
stages: vec![],
};
assert_eq!(summary.stages_executed, 2);
}
}