use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use colored::{Color, Colorize};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
use crate::cache::Cache;
use crate::config::TargetConfig;
use crate::error::RunnerError;
use crate::graph::{ProjectGraph, TaskGraph, TaskId};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum RunMode {
#[default]
FailFast,
Continue,
}
#[derive(Debug, Clone)]
pub struct TaskResult {
pub task_id: TaskId,
pub success: bool,
pub exit_code: Option<i32>,
pub duration: Duration,
pub cached: bool,
}
#[derive(Debug, Clone)]
pub struct RunResult {
pub success_count: usize,
pub failure_count: usize,
pub skipped_count: usize,
pub cached_count: usize,
pub task_results: Vec<TaskResult>,
pub total_duration: Duration,
}
impl RunResult {
pub fn is_success(&self) -> bool {
self.failure_count == 0
}
}
#[derive(Debug)]
enum TaskEvent {
Completed { task_id: TaskId, result: TaskResult },
}
const PROJECT_COLORS: [Color; 6] = [
Color::Cyan,
Color::Green,
Color::Yellow,
Color::Blue,
Color::Magenta,
Color::Red,
];
#[derive(Debug)]
pub struct TaskRunner {
concurrency: usize,
run_mode: RunMode,
working_dir: PathBuf,
cache: Option<Arc<Mutex<Cache>>>,
}
impl TaskRunner {
pub fn new(concurrency: usize, working_dir: PathBuf) -> Self {
Self {
concurrency: concurrency.max(1),
run_mode: RunMode::default(),
working_dir,
cache: None,
}
}
pub fn with_run_mode(mut self, mode: RunMode) -> Self {
self.run_mode = mode;
self
}
pub fn with_cache(mut self, cache: Cache) -> Self {
self.cache = Some(Arc::new(Mutex::new(cache)));
self
}
pub async fn run(
&self,
mut task_graph: TaskGraph,
project_graph: &ProjectGraph,
) -> Result<RunResult, RunnerError> {
let start_time = Instant::now();
let total_tasks = task_graph.len();
if total_tasks == 0 {
return Ok(RunResult {
success_count: 0,
failure_count: 0,
skipped_count: 0,
cached_count: 0,
task_results: vec![],
total_duration: start_time.elapsed(),
});
}
let mut color_map: HashMap<String, Color> = HashMap::new();
for (idx, name) in project_graph.project_names().enumerate() {
color_map.insert(name.to_string(), PROJECT_COLORS[idx % PROJECT_COLORS.len()]);
}
let mut project_roots: HashMap<String, PathBuf> = HashMap::new();
for name in project_graph.project_names() {
if let Some(project) = project_graph.get(name) {
project_roots.insert(name.to_string(), project.root().to_path_buf());
}
}
let mut task_commands: HashMap<TaskId, (String, PathBuf, TargetConfig)> = HashMap::new();
for task_id in task_graph.tasks() {
let project_name = task_id.project().to_string();
let target_name = task_id.target();
if let Some(project) = project_graph.get(task_id.project())
&& let Some(target) = project.targets().get(target_name)
{
let root = project_roots
.get(&project_name)
.cloned()
.unwrap_or_else(|| self.working_dir.clone());
task_commands.insert(
task_id.clone(),
(target.command().to_string(), root, target.clone()),
);
}
}
let completed_hashes: Arc<Mutex<HashMap<TaskId, String>>> =
Arc::new(Mutex::new(HashMap::new()));
let (tx, mut rx) = mpsc::channel::<TaskEvent>(100);
let mut task_results: Vec<TaskResult> = Vec::new();
let mut success_count = 0usize;
let mut failure_count = 0usize;
let mut cached_count = 0usize;
let mut running_count = 0usize;
let mut should_stop = false;
loop {
if !should_stop {
let ready: Vec<TaskId> = task_graph
.ready_tasks()
.iter()
.map(|t| (*t).clone())
.collect();
for task_id in ready {
if running_count >= self.concurrency {
break;
}
if task_graph.mark_running(&task_id).is_err() {
continue;
}
running_count += 1;
let Some((command, cwd, target_config)) = task_commands.get(&task_id).cloned()
else {
continue;
};
let tx = tx.clone();
let color = color_map
.get(&task_id.project().to_string())
.copied()
.unwrap_or(Color::White);
let prefix = format!("[{}]", task_id.project()).color(color).bold();
let mut input_hash: Option<String> = None;
let mut cache_hit = false;
if let Some(ref cache) = self.cache {
let dep_hashes: Vec<String> = {
let hashes = completed_hashes.lock().unwrap();
task_graph
.dependencies_of(&task_id)
.map(|deps| {
deps.iter()
.filter_map(|dep| hashes.get(dep).cloned())
.collect()
})
.unwrap_or_default()
};
let mut cache_guard = cache.lock().unwrap();
if let Ok(hash) = cache_guard.compute_input_hash(
&command,
&cwd,
target_config.inputs(),
&dep_hashes,
) {
input_hash = Some(hash.clone());
if let Some(_entry) = cache_guard.check(&task_id, &hash) {
cache_hit = true;
}
}
}
if cache_hit {
println!(
"{prefix} {} {} {}",
"✓".green(),
task_id.target(),
"[cached]".cyan()
);
if let Some(ref hash) = input_hash {
completed_hashes
.lock()
.unwrap()
.insert(task_id.clone(), hash.clone());
}
let result = TaskResult {
task_id: task_id.clone(),
success: true,
exit_code: Some(0),
duration: Duration::ZERO,
cached: true,
};
let _ = tx.send(TaskEvent::Completed { task_id, result }).await;
} else {
println!("{prefix} Starting {}", task_id.target());
let cache_clone = self.cache.clone();
let completed_hashes_clone = completed_hashes.clone();
tokio::spawn(async move {
let result = run_task(&task_id, &command, &cwd, color).await;
if let Some(ref hash) = input_hash {
completed_hashes_clone
.lock()
.unwrap()
.insert(task_id.clone(), hash.clone());
}
if result.success
&& let (Some(cache), Some(hash)) = (&cache_clone, &input_hash)
{
let _ = cache.lock().unwrap().write(
&task_id,
hash.clone(),
true,
command.clone(),
);
}
let _ = tx.send(TaskEvent::Completed { task_id, result }).await;
});
}
}
}
if running_count == 0 {
break;
}
let Some(event) = rx.recv().await else {
break;
};
match event {
TaskEvent::Completed { task_id, result } => {
let color = color_map
.get(&task_id.project().to_string())
.copied()
.unwrap_or(Color::White);
let prefix = format!("[{}]", task_id.project()).color(color).bold();
if result.cached {
cached_count += 1;
success_count += 1;
} else if result.success {
println!(
"{prefix} {} {} in {:.2}s",
"✓".green(),
task_id.target(),
result.duration.as_secs_f64()
);
success_count += 1;
} else {
let exit_info = result
.exit_code
.map(|c| format!(" (exit code {c})"))
.unwrap_or_default();
eprintln!(
"{prefix} {} {} failed{exit_info} in {:.2}s",
"✗".red(),
task_id.target(),
result.duration.as_secs_f64()
);
failure_count += 1;
if self.run_mode == RunMode::FailFast {
should_stop = true;
}
}
task_results.push(result);
running_count = running_count.saturating_sub(1);
let _ = task_graph.mark_complete(&task_id);
}
}
}
let completed_count = success_count + failure_count;
let skipped_count = total_tasks - completed_count;
Ok(RunResult {
success_count,
failure_count,
skipped_count,
cached_count,
task_results,
total_duration: start_time.elapsed(),
})
}
}
async fn run_task(task_id: &TaskId, command: &str, cwd: &PathBuf, color: Color) -> TaskResult {
let start = Instant::now();
let mut cmd = Command::new("sh");
cmd.arg("-c").arg(command).current_dir(cwd);
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
let spawn_result = cmd.spawn();
match spawn_result {
Ok(mut child) => {
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let prefix = format!("[{}]", task_id.project()).color(color);
let stdout_prefix = prefix.clone();
let stdout_handle = tokio::spawn(async move {
if let Some(stdout) = stdout {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
println!("{stdout_prefix} {line}");
}
}
});
let stderr_prefix = prefix;
let stderr_handle = tokio::spawn(async move {
if let Some(stderr) = stderr {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
eprintln!("{stderr_prefix} {line}");
}
}
});
let status = child.wait().await;
let _ = stdout_handle.await;
let _ = stderr_handle.await;
let duration = start.elapsed();
match status {
Ok(status) => TaskResult {
task_id: task_id.clone(),
success: status.success(),
exit_code: status.code(),
duration,
cached: false,
},
Err(_) => TaskResult {
task_id: task_id.clone(),
success: false,
exit_code: None,
duration,
cached: false,
},
}
}
Err(_) => TaskResult {
task_id: task_id.clone(),
success: false,
exit_code: None,
duration: start.elapsed(),
cached: false,
},
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{ProjectConfig, TargetName};
fn make_project(name: &str, deps: &[&str], targets: &[(&str, &str, &[&str])]) -> ProjectConfig {
let deps_str = if deps.is_empty() {
String::new()
} else {
let dep_list: Vec<String> = deps.iter().map(|d| format!("\"{d}\"")).collect();
format!("depends_on = [{}]", dep_list.join(", "))
};
let targets_str: String = targets
.iter()
.map(|(target_name, cmd, target_deps)| {
let target_deps_str = if target_deps.is_empty() {
String::new()
} else {
let dep_list: Vec<String> =
target_deps.iter().map(|d| format!("\"{d}\"")).collect();
format!("depends_on = [{}]", dep_list.join(", "))
};
format!("[targets.{target_name}]\ncommand = \"{cmd}\"\n{target_deps_str}\n")
})
.collect();
let toml = format!("[project]\nname = \"{name}\"\n{deps_str}\n\n{targets_str}");
ProjectConfig::from_str(&toml, PathBuf::from("/tmp")).unwrap()
}
fn tname(s: &str) -> TargetName {
s.parse().unwrap()
}
#[tokio::test]
async fn test_run_single_task() {
let projects = vec![make_project("app", &[], &[("build", "echo hello", &[])])];
let project_graph = ProjectGraph::build(projects).unwrap();
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.success_count, 1);
assert_eq!(result.failure_count, 0);
assert!(result.is_success());
}
#[tokio::test]
async fn test_run_failing_task() {
let projects = vec![make_project("app", &[], &[("build", "false", &[])])];
let project_graph = ProjectGraph::build(projects).unwrap();
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.success_count, 0);
assert_eq!(result.failure_count, 1);
assert!(!result.is_success());
}
#[tokio::test]
async fn test_dependency_ordering() {
let projects = vec![
make_project("app", &["lib"], &[("build", "echo app", &["^build"])]),
make_project("lib", &[], &[("build", "echo lib", &[])]),
];
let project_graph = ProjectGraph::build(projects).unwrap();
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.success_count, 2);
assert_eq!(result.failure_count, 0);
let lib_idx = result
.task_results
.iter()
.position(|r| r.task_id.project().as_str() == "lib")
.unwrap();
let app_idx = result
.task_results
.iter()
.position(|r| r.task_id.project().as_str() == "app")
.unwrap();
assert!(lib_idx < app_idx, "lib should complete before app");
}
#[tokio::test]
async fn test_parallel_independent_tasks() {
let projects = vec![
make_project("a", &[], &[("build", "sleep 0.1 && echo a", &[])]),
make_project("b", &[], &[("build", "sleep 0.1 && echo b", &[])]),
make_project("c", &[], &[("build", "sleep 0.1 && echo c", &[])]),
];
let project_graph = ProjectGraph::build(projects).unwrap();
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
let start = Instant::now();
let result = runner.run(task_graph, &project_graph).await.unwrap();
let duration = start.elapsed();
assert_eq!(result.success_count, 3);
assert!(
duration.as_secs_f64() < 0.5,
"Tasks should run in parallel, took {:.2}s",
duration.as_secs_f64()
);
}
#[tokio::test]
async fn test_fail_fast_mode() {
let projects = vec![
make_project("a", &[], &[("build", "false", &[])]),
make_project("b", &["a"], &[("build", "echo b", &["^build"])]),
];
let project_graph = ProjectGraph::build(projects).unwrap();
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, PathBuf::from("/tmp")).with_run_mode(RunMode::FailFast);
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.failure_count, 1);
assert_eq!(result.skipped_count, 1);
}
#[tokio::test]
async fn test_continue_mode() {
let projects = vec![
make_project("a", &[], &[("build", "false", &[])]),
make_project("b", &[], &[("build", "echo b", &[])]),
];
let project_graph = ProjectGraph::build(projects).unwrap();
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, PathBuf::from("/tmp")).with_run_mode(RunMode::Continue);
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.success_count, 1);
assert_eq!(result.failure_count, 1);
}
#[tokio::test]
async fn test_empty_graph() {
let projects = vec![make_project("app", &[], &[("lint", "echo lint", &[])])];
let project_graph = ProjectGraph::build(projects).unwrap();
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.success_count, 0);
assert_eq!(result.failure_count, 0);
assert!(result.is_success());
}
#[tokio::test]
async fn test_concurrency_limit() {
let projects = vec![
make_project("a", &[], &[("build", "sleep 0.05", &[])]),
make_project("b", &[], &[("build", "sleep 0.05", &[])]),
make_project("c", &[], &[("build", "sleep 0.05", &[])]),
make_project("d", &[], &[("build", "sleep 0.05", &[])]),
];
let project_graph = ProjectGraph::build(projects).unwrap();
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(2, PathBuf::from("/tmp"));
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.success_count, 4);
}
#[tokio::test]
async fn test_diamond_dependency() {
let projects = vec![
make_project(
"app",
&["lib-a", "lib-b"],
&[("build", "echo app", &["^build"])],
),
make_project("lib-a", &["core"], &[("build", "echo lib-a", &["^build"])]),
make_project("lib-b", &["core"], &[("build", "echo lib-b", &["^build"])]),
make_project("core", &[], &[("build", "echo core", &[])]),
];
let project_graph = ProjectGraph::build(projects).unwrap();
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.success_count, 4);
let core_idx = result
.task_results
.iter()
.position(|r| r.task_id.project().as_str() == "core")
.unwrap();
let app_idx = result
.task_results
.iter()
.position(|r| r.task_id.project().as_str() == "app")
.unwrap();
assert_eq!(core_idx, 0, "core should complete first");
assert_eq!(app_idx, 3, "app should complete last");
}
#[tokio::test]
async fn test_cache_hit_skips_execution() {
use tempfile::TempDir;
let temp = TempDir::new().unwrap();
let project_dir = temp.path().join("app");
std::fs::create_dir_all(&project_dir).unwrap();
let toml = r#"[project]
name = "app"
[targets.build]
command = "echo hello"
inputs = []
"#;
let project = ProjectConfig::from_str(toml, project_dir.clone()).unwrap();
let projects = vec![project];
let project_graph = ProjectGraph::build(projects).unwrap();
let cache = Cache::new(temp.path());
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, temp.path().to_path_buf()).with_cache(cache);
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.success_count, 1);
assert_eq!(result.cached_count, 0);
let cache = Cache::new(temp.path());
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, temp.path().to_path_buf()).with_cache(cache);
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.success_count, 1);
assert_eq!(result.cached_count, 1);
assert!(result.task_results[0].cached);
}
#[tokio::test]
async fn test_changed_input_invalidates_cache() {
use tempfile::TempDir;
let temp = TempDir::new().unwrap();
let project_dir = temp.path().join("app");
let src_dir = project_dir.join("src");
std::fs::create_dir_all(&src_dir).unwrap();
std::fs::write(src_dir.join("main.rs"), "fn main() {}").unwrap();
let toml = r#"[project]
name = "app"
[targets.build]
command = "echo built"
inputs = ["src/**/*.rs"]
"#;
let project = ProjectConfig::from_str(toml, project_dir.clone()).unwrap();
let projects = vec![project];
let project_graph = ProjectGraph::build(projects).unwrap();
let cache = Cache::new(temp.path());
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, temp.path().to_path_buf()).with_cache(cache);
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.cached_count, 0);
std::fs::write(
src_dir.join("main.rs"),
"fn main() { println!(\"modified\"); }",
)
.unwrap();
let cache = Cache::new(temp.path());
let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
let runner = TaskRunner::new(4, temp.path().to_path_buf()).with_cache(cache);
let result = runner.run(task_graph, &project_graph).await.unwrap();
assert_eq!(result.cached_count, 0);
}
}