depot_js/workspace/
runner.rs

1use anyhow::Result;
2
3use futures::{FutureExt, future::BoxFuture};
4use log::debug;
5use std::{
6  cell::RefCell,
7  collections::HashMap,
8  future::Future,
9  sync::{Arc, atomic::Ordering},
10};
11use tokio::sync::Notify;
12
13use crate::{
14  logger::ui::{FullscreenRenderer, InlineRenderer, Renderer},
15  shareable,
16};
17
18use super::{
19  Command, CommandGraph, CommandInner, CommandRuntime, Workspace, build_command_graph,
20  dep_graph::DepGraph,
21};
22
23#[atomic_enum::atomic_enum]
24#[derive(PartialEq)]
25enum TaskStatus {
26  Pending = 0,
27  Running,
28  Finished,
29}
30
31type TaskFuture = Box<dyn FnOnce() -> BoxFuture<'static, (Result<()>, Task)>>;
32
33pub struct TaskInner {
34  key: String,
35  command: Command,
36  deps: Vec<String>,
37  status: AtomicTaskStatus,
38  can_skip: bool,
39}
40
41shareable!(Task, TaskInner);
42
43impl Task {
44  fn make<F: Future<Output = Result<()>> + Send + 'static>(
45    key: String,
46    command: Command,
47    fut: F,
48    deps: Vec<String>,
49    can_skip: bool,
50  ) -> (Self, TaskFuture) {
51    let task = Task::new(TaskInner {
52      key,
53      command,
54      deps,
55      can_skip,
56      status: AtomicTaskStatus::new(TaskStatus::Pending),
57    });
58    let task2 = task.clone();
59    let boxed_fut = Box::new(move || {
60      async move {
61        let result = fut.await;
62        (result, task2)
63      }
64      .boxed()
65    });
66    (task, boxed_fut)
67  }
68}
69
70impl TaskInner {
71  fn key(&self) -> &str {
72    &self.key
73  }
74
75  fn status(&self) -> TaskStatus {
76    self.status.load(Ordering::SeqCst)
77  }
78}
79
80type TaskGraph = DepGraph<Task>;
81
82impl Workspace {
83  fn spawn_log_thread(
84    &self,
85    log_should_exit: &Arc<Notify>,
86    runner_should_exit: &Arc<Notify>,
87    runtime: Option<CommandRuntime>,
88  ) -> impl Future {
89    let ws = self.clone();
90    let log_should_exit = Arc::clone(log_should_exit);
91    let runner_should_exit = Arc::clone(runner_should_exit);
92    let use_fullscreen_renderer =
93      !ws.common.no_fullscreen && matches!(runtime, Some(CommandRuntime::RunForever));
94    tokio::spawn(async move {
95      let result = if use_fullscreen_renderer {
96        FullscreenRenderer::new()
97          .unwrap()
98          .render_loop(&ws, &log_should_exit)
99          .await
100      } else {
101        InlineRenderer::new()
102          .render_loop(&ws, &log_should_exit)
103          .await
104      };
105      match result {
106        Ok(true) => runner_should_exit.notify_one(),
107        Ok(false) => {}
108        Err(e) => {
109          eprintln!("{e}");
110          runner_should_exit.notify_one();
111        }
112      }
113    })
114  }
115
116  fn build_task_graph(
117    &self,
118    cmd_graph: &CommandGraph,
119    runtime: Option<CommandRuntime>,
120  ) -> (TaskGraph, HashMap<Task, TaskFuture>) {
121    let futures = RefCell::new(HashMap::new());
122    let task_pool = RefCell::new(HashMap::new());
123
124    let tasks_for = |cmd: &Command| -> Vec<Task> {
125      macro_rules! add_task {
126        ($key:expr, $task:expr, $deps:expr, $files:expr) => {{
127          task_pool
128            .borrow_mut()
129            .entry($key.clone())
130            .or_insert_with(|| {
131              let can_skip = self.common.incremental
132                && !matches!(runtime, Some(CommandRuntime::RunForever))
133                && match $files {
134                  Some(files) => {
135                    let fingerprints = self.fingerprints.read().unwrap();
136                    fingerprints.can_skip(&$key, files)
137                  }
138                  None => false,
139                };
140
141              let (task, future) = Task::make($key, cmd.clone(), $task, $deps, can_skip);
142              futures.borrow_mut().insert(task.clone(), future);
143              task
144            })
145            .clone()
146        }};
147      }
148
149      match &**cmd {
150        CommandInner::Package(pkg_cmd) => self
151          .roots
152          .iter()
153          .flat_map(|pkg| {
154            self.pkg_graph.all_deps_for(pkg).chain([pkg]).map(|pkg| {
155              let pkg = pkg.clone();
156              let key = pkg_cmd.pkg_key(&pkg);
157              let deps = self
158                .pkg_graph
159                .immediate_deps_for(&pkg)
160                .map(|pkg| pkg_cmd.pkg_key(pkg))
161                .collect();
162              let files = pkg.all_files().collect::<Vec<_>>();
163              add_task!(key, cmd.clone().run_pkg(pkg), deps, Some(files))
164            })
165          })
166          .collect(),
167        CommandInner::Workspace(ws_cmd) => {
168          let this = self.clone();
169          let key = ws_cmd.ws_key();
170          let deps = vec![];
171          let files = ws_cmd.input_files(self);
172          vec![add_task!(key, cmd.clone().run_ws(this), deps, files)]
173        }
174      }
175    };
176
177    let task_graph = DepGraph::build(
178      cmd_graph.roots().flat_map(tasks_for).collect(),
179      |t| t.key.clone(),
180      |task: &Task| {
181        let mut deps = cmd_graph
182          .immediate_deps_for(&task.command)
183          .flat_map(tasks_for)
184          .collect::<Vec<_>>();
185        let runtime = task.command.runtime();
186        if let Some(CommandRuntime::WaitForDependencies) = runtime {
187          deps.extend(task.deps.iter().map(|key| task_pool.borrow()[key].clone()));
188        }
189        deps
190      },
191    )
192    .unwrap();
193
194    (task_graph, futures.into_inner())
195  }
196
197  pub async fn run(&self, root: Command) -> Result<()> {
198    let runtime = root.runtime();
199    let cmd_graph = build_command_graph(&root);
200    let (task_graph, mut task_futures) = self.build_task_graph(&cmd_graph, runtime);
201
202    let log_should_exit: Arc<Notify> = Arc::new(Notify::new());
203    let runner_should_exit: Arc<Notify> = Arc::new(Notify::new());
204
205    let runner_should_exit_fut = runner_should_exit.notified();
206    tokio::pin!(runner_should_exit_fut);
207
208    let cleanup_logs = self.spawn_log_thread(&log_should_exit, &runner_should_exit, runtime);
209
210    let mut running_futures = Vec::new();
211    let result = loop {
212      let finished = task_graph
213        .nodes()
214        .all(|task| task.status() == TaskStatus::Finished);
215      if finished {
216        break Ok(());
217      }
218
219      let pending = task_graph
220        .nodes()
221        .filter(|task| task.status() == TaskStatus::Pending);
222      for task in pending {
223        let imm_deps = task_graph.immediate_deps_for(task).collect::<Vec<_>>();
224        let deps_finished = imm_deps
225          .iter()
226          .all(|dep| dep.status() == TaskStatus::Finished);
227        if deps_finished {
228          let can_skip = task.can_skip && imm_deps.iter().all(|dep| dep.can_skip);
229          let task_fut = task_futures.remove(task).unwrap();
230          if can_skip {
231            task.status.store(TaskStatus::Finished, Ordering::SeqCst);
232          } else {
233            debug!("Starting task for: {}", task.key());
234            task.status.store(TaskStatus::Running, Ordering::SeqCst);
235            running_futures.push(tokio::spawn(task_fut()));
236          }
237        }
238      }
239
240      if running_futures.is_empty() {
241        continue;
242      }
243
244      let one_output = futures::future::select_all(&mut running_futures);
245      let (result, idx, _) = tokio::select! { biased;
246        () = &mut runner_should_exit_fut => break Ok(()),
247        output = one_output => output,
248      };
249
250      running_futures.remove(idx);
251
252      let (result, completed_task) = result?;
253
254      if result.is_err() {
255        break result;
256      }
257
258      debug!("Finishing task for: {}", completed_task.key());
259      completed_task
260        .status
261        .store(TaskStatus::Finished, Ordering::SeqCst);
262      self
263        .fingerprints
264        .write()
265        .unwrap()
266        .update_time(completed_task.key().to_string());
267    };
268
269    for fut in &mut running_futures {
270      fut.abort();
271    }
272
273    for fut in &mut running_futures {
274      let _ = fut.await;
275    }
276
277    log::debug!("All tasks complete, waiting for log thread to exit");
278    log_should_exit.notify_one();
279    cleanup_logs.await;
280
281    if root.name() != "clean" {
282      self.fingerprints.read().unwrap().save(&self.root)?;
283    }
284
285    result
286  }
287}