1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4use std::time::{Duration, Instant};
5
6use colored::{Color, Colorize};
7use tokio::io::{AsyncBufReadExt, BufReader};
8use tokio::process::Command;
9use tokio::sync::mpsc;
10
11use crate::cache::Cache;
12use crate::config::TargetConfig;
13use crate::error::RunnerError;
14use crate::graph::{ProjectGraph, TaskGraph, TaskId};
15
16#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
18pub enum RunMode {
19 #[default]
21 FailFast,
22 Continue,
24}
25
26#[derive(Debug, Clone)]
28pub struct TaskResult {
29 pub task_id: TaskId,
31 pub success: bool,
33 pub exit_code: Option<i32>,
35 pub duration: Duration,
37 pub cached: bool,
39}
40
41#[derive(Debug, Clone)]
43pub struct RunResult {
44 pub success_count: usize,
46 pub failure_count: usize,
48 pub skipped_count: usize,
50 pub cached_count: usize,
52 pub task_results: Vec<TaskResult>,
54 pub total_duration: Duration,
56}
57
58impl RunResult {
59 pub fn is_success(&self) -> bool {
61 self.failure_count == 0
62 }
63}
64
65#[derive(Debug)]
67enum TaskEvent {
68 Completed { task_id: TaskId, result: TaskResult },
70}
71
72const PROJECT_COLORS: [Color; 6] = [
74 Color::Cyan,
75 Color::Green,
76 Color::Yellow,
77 Color::Blue,
78 Color::Magenta,
79 Color::Red,
80];
81
82#[derive(Debug)]
84pub struct TaskRunner {
85 concurrency: usize,
87 run_mode: RunMode,
89 working_dir: PathBuf,
91 cache: Option<Arc<Mutex<Cache>>>,
93}
94
95impl TaskRunner {
96 pub fn new(concurrency: usize, working_dir: PathBuf) -> Self {
98 Self {
99 concurrency: concurrency.max(1),
100 run_mode: RunMode::default(),
101 working_dir,
102 cache: None,
103 }
104 }
105
106 pub fn with_run_mode(mut self, mode: RunMode) -> Self {
108 self.run_mode = mode;
109 self
110 }
111
112 pub fn with_cache(mut self, cache: Cache) -> Self {
114 self.cache = Some(Arc::new(Mutex::new(cache)));
115 self
116 }
117
118 pub async fn run(
120 &self,
121 mut task_graph: TaskGraph,
122 project_graph: &ProjectGraph,
123 ) -> Result<RunResult, RunnerError> {
124 let start_time = Instant::now();
125 let total_tasks = task_graph.len();
126
127 if total_tasks == 0 {
128 return Ok(RunResult {
129 success_count: 0,
130 failure_count: 0,
131 skipped_count: 0,
132 cached_count: 0,
133 task_results: vec![],
134 total_duration: start_time.elapsed(),
135 });
136 }
137
138 let mut color_map: HashMap<String, Color> = HashMap::new();
140 for (idx, name) in project_graph.project_names().enumerate() {
141 color_map.insert(name.to_string(), PROJECT_COLORS[idx % PROJECT_COLORS.len()]);
142 }
143
144 let mut project_roots: HashMap<String, PathBuf> = HashMap::new();
146 for name in project_graph.project_names() {
147 if let Some(project) = project_graph.get(name) {
148 project_roots.insert(name.to_string(), project.root().to_path_buf());
149 }
150 }
151
152 let mut task_commands: HashMap<TaskId, (String, PathBuf, TargetConfig)> = HashMap::new();
154 for task_id in task_graph.tasks() {
155 let project_name = task_id.project().to_string();
156 let target_name = task_id.target();
157 if let Some(project) = project_graph.get(task_id.project())
158 && let Some(target) = project.targets().get(target_name)
159 {
160 let root = project_roots
161 .get(&project_name)
162 .cloned()
163 .unwrap_or_else(|| self.working_dir.clone());
164 task_commands.insert(
165 task_id.clone(),
166 (target.command().to_string(), root, target.clone()),
167 );
168 }
169 }
170
171 let completed_hashes: Arc<Mutex<HashMap<TaskId, String>>> =
173 Arc::new(Mutex::new(HashMap::new()));
174
175 let (tx, mut rx) = mpsc::channel::<TaskEvent>(100);
177
178 let mut task_results: Vec<TaskResult> = Vec::new();
179 let mut success_count = 0usize;
180 let mut failure_count = 0usize;
181 let mut cached_count = 0usize;
182 let mut running_count = 0usize;
183 let mut should_stop = false;
184
185 loop {
187 if !should_stop {
189 let ready: Vec<TaskId> = task_graph
190 .ready_tasks()
191 .iter()
192 .map(|t| (*t).clone())
193 .collect();
194
195 for task_id in ready {
196 if running_count >= self.concurrency {
197 break;
198 }
199
200 if task_graph.mark_running(&task_id).is_err() {
202 continue;
203 }
204
205 running_count += 1;
206
207 let Some((command, cwd, target_config)) = task_commands.get(&task_id).cloned()
209 else {
210 continue;
211 };
212
213 let tx = tx.clone();
214 let color = color_map
215 .get(&task_id.project().to_string())
216 .copied()
217 .unwrap_or(Color::White);
218
219 let prefix = format!("[{}]", task_id.project()).color(color).bold();
220
221 let mut input_hash: Option<String> = None;
223 let mut cache_hit = false;
224
225 if let Some(ref cache) = self.cache {
226 let dep_hashes: Vec<String> = {
228 let hashes = completed_hashes.lock().unwrap();
229 task_graph
230 .dependencies_of(&task_id)
231 .map(|deps| {
232 deps.iter()
233 .filter_map(|dep| hashes.get(dep).cloned())
234 .collect()
235 })
236 .unwrap_or_default()
237 };
238
239 let mut cache_guard = cache.lock().unwrap();
241 if let Ok(hash) = cache_guard.compute_input_hash(
242 &command,
243 &cwd,
244 target_config.inputs(),
245 &dep_hashes,
246 ) {
247 input_hash = Some(hash.clone());
248
249 if let Some(_entry) = cache_guard.check(&task_id, &hash) {
251 cache_hit = true;
252 }
253 }
254 }
255
256 if cache_hit {
257 println!(
258 "{prefix} {} {} {}",
259 "✓".green(),
260 task_id.target(),
261 "[cached]".cyan()
262 );
263
264 if let Some(ref hash) = input_hash {
266 completed_hashes
267 .lock()
268 .unwrap()
269 .insert(task_id.clone(), hash.clone());
270 }
271
272 let result = TaskResult {
274 task_id: task_id.clone(),
275 success: true,
276 exit_code: Some(0),
277 duration: Duration::ZERO,
278 cached: true,
279 };
280 let _ = tx.send(TaskEvent::Completed { task_id, result }).await;
281 } else {
282 println!("{prefix} Starting {}", task_id.target());
283
284 let cache_clone = self.cache.clone();
286 let completed_hashes_clone = completed_hashes.clone();
287 tokio::spawn(async move {
288 let result = run_task(&task_id, &command, &cwd, color).await;
289
290 if let Some(ref hash) = input_hash {
292 completed_hashes_clone
293 .lock()
294 .unwrap()
295 .insert(task_id.clone(), hash.clone());
296 }
297
298 if result.success
300 && let (Some(cache), Some(hash)) = (&cache_clone, &input_hash)
301 {
302 let _ = cache.lock().unwrap().write(
303 &task_id,
304 hash.clone(),
305 true,
306 command.clone(),
307 );
308 }
309
310 let _ = tx.send(TaskEvent::Completed { task_id, result }).await;
311 });
312 }
313 }
314 }
315
316 if running_count == 0 {
318 break;
319 }
320
321 let Some(event) = rx.recv().await else {
323 break;
324 };
325
326 match event {
327 TaskEvent::Completed { task_id, result } => {
328 let color = color_map
329 .get(&task_id.project().to_string())
330 .copied()
331 .unwrap_or(Color::White);
332 let prefix = format!("[{}]", task_id.project()).color(color).bold();
333
334 if result.cached {
335 cached_count += 1;
337 success_count += 1;
338 } else if result.success {
339 println!(
340 "{prefix} {} {} in {:.2}s",
341 "✓".green(),
342 task_id.target(),
343 result.duration.as_secs_f64()
344 );
345 success_count += 1;
346 } else {
347 let exit_info = result
348 .exit_code
349 .map(|c| format!(" (exit code {c})"))
350 .unwrap_or_default();
351 eprintln!(
352 "{prefix} {} {} failed{exit_info} in {:.2}s",
353 "✗".red(),
354 task_id.target(),
355 result.duration.as_secs_f64()
356 );
357 failure_count += 1;
358
359 if self.run_mode == RunMode::FailFast {
361 should_stop = true;
362 }
363 }
364
365 task_results.push(result);
366 running_count = running_count.saturating_sub(1);
367
368 let _ = task_graph.mark_complete(&task_id);
370 }
371 }
372 }
373
374 let completed_count = success_count + failure_count;
375 let skipped_count = total_tasks - completed_count;
376
377 Ok(RunResult {
378 success_count,
379 failure_count,
380 skipped_count,
381 cached_count,
382 task_results,
383 total_duration: start_time.elapsed(),
384 })
385 }
386}
387
388async fn run_task(task_id: &TaskId, command: &str, cwd: &PathBuf, color: Color) -> TaskResult {
390 let start = Instant::now();
391
392 let mut cmd = Command::new("sh");
394 cmd.arg("-c").arg(command).current_dir(cwd);
395
396 cmd.stdout(std::process::Stdio::piped());
397 cmd.stderr(std::process::Stdio::piped());
398
399 let spawn_result = cmd.spawn();
400
401 match spawn_result {
402 Ok(mut child) => {
403 let stdout = child.stdout.take();
405 let stderr = child.stderr.take();
406
407 let prefix = format!("[{}]", task_id.project()).color(color);
408
409 let stdout_prefix = prefix.clone();
410 let stdout_handle = tokio::spawn(async move {
411 if let Some(stdout) = stdout {
412 let reader = BufReader::new(stdout);
413 let mut lines = reader.lines();
414 while let Ok(Some(line)) = lines.next_line().await {
415 println!("{stdout_prefix} {line}");
416 }
417 }
418 });
419
420 let stderr_prefix = prefix;
421 let stderr_handle = tokio::spawn(async move {
422 if let Some(stderr) = stderr {
423 let reader = BufReader::new(stderr);
424 let mut lines = reader.lines();
425 while let Ok(Some(line)) = lines.next_line().await {
426 eprintln!("{stderr_prefix} {line}");
427 }
428 }
429 });
430
431 let status = child.wait().await;
433
434 let _ = stdout_handle.await;
436 let _ = stderr_handle.await;
437
438 let duration = start.elapsed();
439
440 match status {
441 Ok(status) => TaskResult {
442 task_id: task_id.clone(),
443 success: status.success(),
444 exit_code: status.code(),
445 duration,
446 cached: false,
447 },
448 Err(_) => TaskResult {
449 task_id: task_id.clone(),
450 success: false,
451 exit_code: None,
452 duration,
453 cached: false,
454 },
455 }
456 }
457 Err(_) => TaskResult {
458 task_id: task_id.clone(),
459 success: false,
460 exit_code: None,
461 duration: start.elapsed(),
462 cached: false,
463 },
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470 use crate::config::{ProjectConfig, TargetName};
471
472 fn make_project(name: &str, deps: &[&str], targets: &[(&str, &str, &[&str])]) -> ProjectConfig {
473 let deps_str = if deps.is_empty() {
474 String::new()
475 } else {
476 let dep_list: Vec<String> = deps.iter().map(|d| format!("\"{d}\"")).collect();
477 format!("depends_on = [{}]", dep_list.join(", "))
478 };
479
480 let targets_str: String = targets
481 .iter()
482 .map(|(target_name, cmd, target_deps)| {
483 let target_deps_str = if target_deps.is_empty() {
484 String::new()
485 } else {
486 let dep_list: Vec<String> =
487 target_deps.iter().map(|d| format!("\"{d}\"")).collect();
488 format!("depends_on = [{}]", dep_list.join(", "))
489 };
490 format!("[targets.{target_name}]\ncommand = \"{cmd}\"\n{target_deps_str}\n")
491 })
492 .collect();
493
494 let toml = format!("[project]\nname = \"{name}\"\n{deps_str}\n\n{targets_str}");
495 ProjectConfig::from_str(&toml, PathBuf::from("/tmp")).unwrap()
497 }
498
499 fn tname(s: &str) -> TargetName {
500 s.parse().unwrap()
501 }
502
503 #[tokio::test]
504 async fn test_run_single_task() {
505 let projects = vec![make_project("app", &[], &[("build", "echo hello", &[])])];
506 let project_graph = ProjectGraph::build(projects).unwrap();
507 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
508
509 let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
510 let result = runner.run(task_graph, &project_graph).await.unwrap();
511
512 assert_eq!(result.success_count, 1);
513 assert_eq!(result.failure_count, 0);
514 assert!(result.is_success());
515 }
516
517 #[tokio::test]
518 async fn test_run_failing_task() {
519 let projects = vec![make_project("app", &[], &[("build", "false", &[])])];
520 let project_graph = ProjectGraph::build(projects).unwrap();
521 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
522
523 let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
524 let result = runner.run(task_graph, &project_graph).await.unwrap();
525
526 assert_eq!(result.success_count, 0);
527 assert_eq!(result.failure_count, 1);
528 assert!(!result.is_success());
529 }
530
531 #[tokio::test]
532 async fn test_dependency_ordering() {
533 let projects = vec![
535 make_project("app", &["lib"], &[("build", "echo app", &["^build"])]),
536 make_project("lib", &[], &[("build", "echo lib", &[])]),
537 ];
538 let project_graph = ProjectGraph::build(projects).unwrap();
539 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
540
541 let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
542 let result = runner.run(task_graph, &project_graph).await.unwrap();
543
544 assert_eq!(result.success_count, 2);
545 assert_eq!(result.failure_count, 0);
546
547 let lib_idx = result
549 .task_results
550 .iter()
551 .position(|r| r.task_id.project().as_str() == "lib")
552 .unwrap();
553 let app_idx = result
554 .task_results
555 .iter()
556 .position(|r| r.task_id.project().as_str() == "app")
557 .unwrap();
558
559 assert!(lib_idx < app_idx, "lib should complete before app");
561 }
562
563 #[tokio::test]
564 async fn test_parallel_independent_tasks() {
565 let projects = vec![
567 make_project("a", &[], &[("build", "sleep 0.1 && echo a", &[])]),
568 make_project("b", &[], &[("build", "sleep 0.1 && echo b", &[])]),
569 make_project("c", &[], &[("build", "sleep 0.1 && echo c", &[])]),
570 ];
571 let project_graph = ProjectGraph::build(projects).unwrap();
572 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
573
574 let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
575 let start = Instant::now();
576 let result = runner.run(task_graph, &project_graph).await.unwrap();
577 let duration = start.elapsed();
578
579 assert_eq!(result.success_count, 3);
580
581 assert!(
584 duration.as_secs_f64() < 0.5,
585 "Tasks should run in parallel, took {:.2}s",
586 duration.as_secs_f64()
587 );
588 }
589
590 #[tokio::test]
591 async fn test_fail_fast_mode() {
592 let projects = vec![
594 make_project("a", &[], &[("build", "false", &[])]),
595 make_project("b", &["a"], &[("build", "echo b", &["^build"])]),
596 ];
597 let project_graph = ProjectGraph::build(projects).unwrap();
598 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
599
600 let runner = TaskRunner::new(4, PathBuf::from("/tmp")).with_run_mode(RunMode::FailFast);
601 let result = runner.run(task_graph, &project_graph).await.unwrap();
602
603 assert_eq!(result.failure_count, 1);
604 assert_eq!(result.skipped_count, 1);
606 }
607
608 #[tokio::test]
609 async fn test_continue_mode() {
610 let projects = vec![
612 make_project("a", &[], &[("build", "false", &[])]),
613 make_project("b", &[], &[("build", "echo b", &[])]),
614 ];
615 let project_graph = ProjectGraph::build(projects).unwrap();
616 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
617
618 let runner = TaskRunner::new(4, PathBuf::from("/tmp")).with_run_mode(RunMode::Continue);
619 let result = runner.run(task_graph, &project_graph).await.unwrap();
620
621 assert_eq!(result.success_count, 1);
623 assert_eq!(result.failure_count, 1);
624 }
625
626 #[tokio::test]
627 async fn test_empty_graph() {
628 let projects = vec![make_project("app", &[], &[("lint", "echo lint", &[])])];
629 let project_graph = ProjectGraph::build(projects).unwrap();
630 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
631
632 let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
633 let result = runner.run(task_graph, &project_graph).await.unwrap();
634
635 assert_eq!(result.success_count, 0);
636 assert_eq!(result.failure_count, 0);
637 assert!(result.is_success());
638 }
639
640 #[tokio::test]
641 async fn test_concurrency_limit() {
642 let projects = vec![
644 make_project("a", &[], &[("build", "sleep 0.05", &[])]),
645 make_project("b", &[], &[("build", "sleep 0.05", &[])]),
646 make_project("c", &[], &[("build", "sleep 0.05", &[])]),
647 make_project("d", &[], &[("build", "sleep 0.05", &[])]),
648 ];
649 let project_graph = ProjectGraph::build(projects).unwrap();
650 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
651
652 let runner = TaskRunner::new(2, PathBuf::from("/tmp"));
653 let result = runner.run(task_graph, &project_graph).await.unwrap();
654
655 assert_eq!(result.success_count, 4);
656 }
657
658 #[tokio::test]
659 async fn test_diamond_dependency() {
660 let projects = vec![
663 make_project(
664 "app",
665 &["lib-a", "lib-b"],
666 &[("build", "echo app", &["^build"])],
667 ),
668 make_project("lib-a", &["core"], &[("build", "echo lib-a", &["^build"])]),
669 make_project("lib-b", &["core"], &[("build", "echo lib-b", &["^build"])]),
670 make_project("core", &[], &[("build", "echo core", &[])]),
671 ];
672 let project_graph = ProjectGraph::build(projects).unwrap();
673 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
674
675 let runner = TaskRunner::new(4, PathBuf::from("/tmp"));
676 let result = runner.run(task_graph, &project_graph).await.unwrap();
677
678 assert_eq!(result.success_count, 4);
679
680 let core_idx = result
682 .task_results
683 .iter()
684 .position(|r| r.task_id.project().as_str() == "core")
685 .unwrap();
686 let app_idx = result
687 .task_results
688 .iter()
689 .position(|r| r.task_id.project().as_str() == "app")
690 .unwrap();
691
692 assert_eq!(core_idx, 0, "core should complete first");
693 assert_eq!(app_idx, 3, "app should complete last");
694 }
695
696 #[tokio::test]
697 async fn test_cache_hit_skips_execution() {
698 use tempfile::TempDir;
699
700 let temp = TempDir::new().unwrap();
701 let project_dir = temp.path().join("app");
702 std::fs::create_dir_all(&project_dir).unwrap();
703
704 let toml = r#"[project]
706name = "app"
707
708[targets.build]
709command = "echo hello"
710inputs = []
711"#;
712 let project = ProjectConfig::from_str(toml, project_dir.clone()).unwrap();
713 let projects = vec![project];
714 let project_graph = ProjectGraph::build(projects).unwrap();
715
716 let cache = Cache::new(temp.path());
718 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
719 let runner = TaskRunner::new(4, temp.path().to_path_buf()).with_cache(cache);
720 let result = runner.run(task_graph, &project_graph).await.unwrap();
721
722 assert_eq!(result.success_count, 1);
723 assert_eq!(result.cached_count, 0);
724
725 let cache = Cache::new(temp.path());
727 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
728 let runner = TaskRunner::new(4, temp.path().to_path_buf()).with_cache(cache);
729 let result = runner.run(task_graph, &project_graph).await.unwrap();
730
731 assert_eq!(result.success_count, 1);
732 assert_eq!(result.cached_count, 1);
733 assert!(result.task_results[0].cached);
734 }
735
736 #[tokio::test]
737 async fn test_changed_input_invalidates_cache() {
738 use tempfile::TempDir;
739
740 let temp = TempDir::new().unwrap();
741 let project_dir = temp.path().join("app");
742 let src_dir = project_dir.join("src");
743 std::fs::create_dir_all(&src_dir).unwrap();
744 std::fs::write(src_dir.join("main.rs"), "fn main() {}").unwrap();
745
746 let toml = r#"[project]
747name = "app"
748
749[targets.build]
750command = "echo built"
751inputs = ["src/**/*.rs"]
752"#;
753 let project = ProjectConfig::from_str(toml, project_dir.clone()).unwrap();
754 let projects = vec![project];
755 let project_graph = ProjectGraph::build(projects).unwrap();
756
757 let cache = Cache::new(temp.path());
759 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
760 let runner = TaskRunner::new(4, temp.path().to_path_buf()).with_cache(cache);
761 let result = runner.run(task_graph, &project_graph).await.unwrap();
762
763 assert_eq!(result.cached_count, 0);
764
765 std::fs::write(
767 src_dir.join("main.rs"),
768 "fn main() { println!(\"modified\"); }",
769 )
770 .unwrap();
771
772 let cache = Cache::new(temp.path());
774 let task_graph = TaskGraph::build(&project_graph, &tname("build")).unwrap();
775 let runner = TaskRunner::new(4, temp.path().to_path_buf()).with_cache(cache);
776 let result = runner.run(task_graph, &project_graph).await.unwrap();
777
778 assert_eq!(result.cached_count, 0);
779 }
780}