1use anyhow::Result;
2
3use futures::{FutureExt, future::BoxFuture};
4use log::debug;
5use std::{
6 cell::RefCell,
7 collections::HashMap,
8 future::Future,
9 sync::{Arc, atomic::Ordering},
10};
11use tokio::sync::Notify;
12
13use crate::{
14 logger::ui::{FullscreenRenderer, InlineRenderer, Renderer},
15 shareable,
16};
17
18use super::{
19 Command, CommandGraph, CommandInner, CommandRuntime, Workspace, build_command_graph,
20 dep_graph::DepGraph,
21};
22
23#[atomic_enum::atomic_enum]
24#[derive(PartialEq)]
25enum TaskStatus {
26 Pending = 0,
27 Running,
28 Finished,
29}
30
31type TaskFuture = Box<dyn FnOnce() -> BoxFuture<'static, (Result<()>, Task)>>;
32
33pub struct TaskInner {
34 key: String,
35 command: Command,
36 deps: Vec<String>,
37 status: AtomicTaskStatus,
38 can_skip: bool,
39}
40
41shareable!(Task, TaskInner);
42
43impl Task {
44 fn make<F: Future<Output = Result<()>> + Send + 'static>(
45 key: String,
46 command: Command,
47 fut: F,
48 deps: Vec<String>,
49 can_skip: bool,
50 ) -> (Self, TaskFuture) {
51 let task = Task::new(TaskInner {
52 key,
53 command,
54 deps,
55 can_skip,
56 status: AtomicTaskStatus::new(TaskStatus::Pending),
57 });
58 let task2 = task.clone();
59 let boxed_fut = Box::new(move || {
60 async move {
61 let result = fut.await;
62 (result, task2)
63 }
64 .boxed()
65 });
66 (task, boxed_fut)
67 }
68}
69
70impl TaskInner {
71 fn key(&self) -> &str {
72 &self.key
73 }
74
75 fn status(&self) -> TaskStatus {
76 self.status.load(Ordering::SeqCst)
77 }
78}
79
80type TaskGraph = DepGraph<Task>;
81
82impl Workspace {
83 fn spawn_log_thread(
84 &self,
85 log_should_exit: &Arc<Notify>,
86 runner_should_exit: &Arc<Notify>,
87 runtime: Option<CommandRuntime>,
88 ) -> impl Future {
89 let ws = self.clone();
90 let log_should_exit = Arc::clone(log_should_exit);
91 let runner_should_exit = Arc::clone(runner_should_exit);
92 let use_fullscreen_renderer =
93 !ws.common.no_fullscreen && matches!(runtime, Some(CommandRuntime::RunForever));
94 tokio::spawn(async move {
95 let result = if use_fullscreen_renderer {
96 FullscreenRenderer::new()
97 .unwrap()
98 .render_loop(&ws, &log_should_exit)
99 .await
100 } else {
101 InlineRenderer::new()
102 .render_loop(&ws, &log_should_exit)
103 .await
104 };
105 match result {
106 Ok(true) => runner_should_exit.notify_one(),
107 Ok(false) => {}
108 Err(e) => {
109 eprintln!("{e}");
110 runner_should_exit.notify_one();
111 }
112 }
113 })
114 }
115
116 fn build_task_graph(
117 &self,
118 cmd_graph: &CommandGraph,
119 runtime: Option<CommandRuntime>,
120 ) -> (TaskGraph, HashMap<Task, TaskFuture>) {
121 let futures = RefCell::new(HashMap::new());
122 let task_pool = RefCell::new(HashMap::new());
123
124 let tasks_for = |cmd: &Command| -> Vec<Task> {
125 macro_rules! add_task {
126 ($key:expr, $task:expr, $deps:expr, $files:expr) => {{
127 task_pool
128 .borrow_mut()
129 .entry($key.clone())
130 .or_insert_with(|| {
131 let can_skip = self.common.incremental
132 && !matches!(runtime, Some(CommandRuntime::RunForever))
133 && match $files {
134 Some(files) => {
135 let fingerprints = self.fingerprints.read().unwrap();
136 fingerprints.can_skip(&$key, files)
137 }
138 None => false,
139 };
140
141 let (task, future) = Task::make($key, cmd.clone(), $task, $deps, can_skip);
142 futures.borrow_mut().insert(task.clone(), future);
143 task
144 })
145 .clone()
146 }};
147 }
148
149 match &**cmd {
150 CommandInner::Package(pkg_cmd) => self
151 .roots
152 .iter()
153 .flat_map(|pkg| {
154 self.pkg_graph.all_deps_for(pkg).chain([pkg]).map(|pkg| {
155 let pkg = pkg.clone();
156 let key = pkg_cmd.pkg_key(&pkg);
157 let deps = self
158 .pkg_graph
159 .immediate_deps_for(&pkg)
160 .map(|pkg| pkg_cmd.pkg_key(pkg))
161 .collect();
162 let files = pkg.all_files().collect::<Vec<_>>();
163 add_task!(key, cmd.clone().run_pkg(pkg), deps, Some(files))
164 })
165 })
166 .collect(),
167 CommandInner::Workspace(ws_cmd) => {
168 let this = self.clone();
169 let key = ws_cmd.ws_key();
170 let deps = vec![];
171 let files = ws_cmd.input_files(self);
172 vec![add_task!(key, cmd.clone().run_ws(this), deps, files)]
173 }
174 }
175 };
176
177 let task_graph = DepGraph::build(
178 cmd_graph.roots().flat_map(tasks_for).collect(),
179 |t| t.key.clone(),
180 |task: &Task| {
181 let mut deps = cmd_graph
182 .immediate_deps_for(&task.command)
183 .flat_map(tasks_for)
184 .collect::<Vec<_>>();
185 let runtime = task.command.runtime();
186 if let Some(CommandRuntime::WaitForDependencies) = runtime {
187 deps.extend(task.deps.iter().map(|key| task_pool.borrow()[key].clone()));
188 }
189 deps
190 },
191 )
192 .unwrap();
193
194 (task_graph, futures.into_inner())
195 }
196
197 pub async fn run(&self, root: Command) -> Result<()> {
198 let runtime = root.runtime();
199 let cmd_graph = build_command_graph(&root);
200 let (task_graph, mut task_futures) = self.build_task_graph(&cmd_graph, runtime);
201
202 let log_should_exit: Arc<Notify> = Arc::new(Notify::new());
203 let runner_should_exit: Arc<Notify> = Arc::new(Notify::new());
204
205 let runner_should_exit_fut = runner_should_exit.notified();
206 tokio::pin!(runner_should_exit_fut);
207
208 let cleanup_logs = self.spawn_log_thread(&log_should_exit, &runner_should_exit, runtime);
209
210 let mut running_futures = Vec::new();
211 let result = loop {
212 let finished = task_graph
213 .nodes()
214 .all(|task| task.status() == TaskStatus::Finished);
215 if finished {
216 break Ok(());
217 }
218
219 let pending = task_graph
220 .nodes()
221 .filter(|task| task.status() == TaskStatus::Pending);
222 for task in pending {
223 let imm_deps = task_graph.immediate_deps_for(task).collect::<Vec<_>>();
224 let deps_finished = imm_deps
225 .iter()
226 .all(|dep| dep.status() == TaskStatus::Finished);
227 if deps_finished {
228 let can_skip = task.can_skip && imm_deps.iter().all(|dep| dep.can_skip);
229 let task_fut = task_futures.remove(task).unwrap();
230 if can_skip {
231 task.status.store(TaskStatus::Finished, Ordering::SeqCst);
232 } else {
233 debug!("Starting task for: {}", task.key());
234 task.status.store(TaskStatus::Running, Ordering::SeqCst);
235 running_futures.push(tokio::spawn(task_fut()));
236 }
237 }
238 }
239
240 if running_futures.is_empty() {
241 continue;
242 }
243
244 let one_output = futures::future::select_all(&mut running_futures);
245 let (result, idx, _) = tokio::select! { biased;
246 () = &mut runner_should_exit_fut => break Ok(()),
247 output = one_output => output,
248 };
249
250 running_futures.remove(idx);
251
252 let (result, completed_task) = result?;
253
254 if result.is_err() {
255 break result;
256 }
257
258 debug!("Finishing task for: {}", completed_task.key());
259 completed_task
260 .status
261 .store(TaskStatus::Finished, Ordering::SeqCst);
262 self
263 .fingerprints
264 .write()
265 .unwrap()
266 .update_time(completed_task.key().to_string());
267 };
268
269 for fut in &mut running_futures {
270 fut.abort();
271 }
272
273 for fut in &mut running_futures {
274 let _ = fut.await;
275 }
276
277 log::debug!("All tasks complete, waiting for log thread to exit");
278 log_should_exit.notify_one();
279 cleanup_logs.await;
280
281 if root.name() != "clean" {
282 self.fingerprints.read().unwrap().save(&self.root)?;
283 }
284
285 result
286 }
287}