1use std::{
2 collections::{HashMap, HashSet},
3 path::{Path, PathBuf},
4};
5
6use petgraph::{algo::toposort, graph::NodeIndex, stable_graph::StableDiGraph, visit::Dfs};
7use xxhash_rust::xxh3::xxh3_64;
8
9use crate::{
10 frontend::utils::maybe_plural,
11 utils::{clear_luau_configuration_cache, Timer},
12 DarkluaError,
13};
14
15use super::{
16 normalize_path, work_item::WorkStatus, Configuration, DarkluaResult, Options, Resources,
17 WorkItem, Worker,
18};
19
20#[derive(Debug, Default)]
21pub struct WorkerTree {
22 graph: StableDiGraph<WorkItem, ()>,
23 node_map: HashMap<PathBuf, NodeIndex>,
24 external_dependencies: HashMap<PathBuf, HashSet<NodeIndex>>,
25 remove_files: Vec<PathBuf>,
26 last_configuration_hash: Option<u64>,
27}
28
29impl WorkerTree {
30 pub fn collect_work(&mut self, resources: &Resources, options: &Options) -> DarkluaResult<()> {
31 log::trace!("start collecting work");
32 let collect_work_timer = Timer::now();
33
34 if let Some(output) = options.output().map(Path::to_path_buf) {
35 if resources.is_file(options.input())? {
36 if resources.is_directory(&output)? {
37 let file_name = options.input().file_name().ok_or_else(|| {
38 DarkluaError::custom(format!(
39 "unable to extract file name from `{}`",
40 options.input().display()
41 ))
42 })?;
43
44 self.add_source_if_missing(options.input(), Some(output.join(file_name)));
45 } else if resources.is_file(&output)? || output.extension().is_some() {
46 self.add_source_if_missing(options.input(), Some(output));
47 } else {
48 let file_name = options.input().file_name().ok_or_else(|| {
49 DarkluaError::custom(format!(
50 "unable to extract file name from `{}`",
51 options.input().display()
52 ))
53 })?;
54
55 self.add_source_if_missing(options.input(), Some(output.join(file_name)));
56 }
57 } else {
58 let input = options.input().to_path_buf();
59
60 for source in resources.collect_work(&input) {
61 let source = normalize_path(source);
62
63 let relative_path = source.strip_prefix(&input).map_err(|err| {
64 DarkluaError::custom(format!(
65 "unable to remove path prefix `{}` from `{}`: {}",
66 input.display(),
67 source.display(),
68 err
69 ))
70 })?;
71
72 let output_path = Some(output.join(relative_path));
73 self.add_source_if_missing(source, output_path);
74 }
75 }
76 } else {
77 let input = options.input().to_path_buf();
78
79 for source in resources.collect_work(input) {
80 self.add_source_if_missing(source, None);
81 }
82 }
83
84 log::trace!("work collected in {}", collect_work_timer.duration_label());
85
86 Ok(())
87 }
88
89 pub fn process(&mut self, resources: &Resources, mut options: Options) -> DarkluaResult<()> {
90 clear_luau_configuration_cache();
91
92 if !self.remove_files.is_empty() {
93 let remove_count = self.remove_files.len();
94 log::debug!(
95 "clean {} file{} before beginning process",
96 remove_count,
97 maybe_plural(remove_count)
98 );
99 for path in self.remove_files.drain(..) {
100 log::trace!("remove file {}", path.display());
101 if let Err(err) = resources.remove(path).map_err(DarkluaError::from) {
102 log::warn!("failed to remove resource: {}", err);
103 }
104 }
105 }
106
107 let mut worker = Worker::new(resources);
108 worker.setup_worker(&mut options)?;
109
110 if self.has_configuration_changed(worker.configuration()) {
111 log::debug!("configuration change detected");
112 self.reset();
113 }
114
115 let total_not_done = self
116 .graph
117 .node_weights()
118 .filter(|work_item| !work_item.status.is_done())
119 .count();
120
121 if total_not_done == 0 {
122 return Ok(());
123 }
124
125 let work_timer = Timer::now();
126
127 'work_loop: loop {
128 let mut add_edges = Vec::new();
129
130 match toposort(&self.graph, None) {
131 Ok(node_indexes) => {
132 let mut done_count = 0;
133
134 for node_index in node_indexes {
135 let work_item = self
136 .graph
137 .node_weight_mut(node_index)
138 .expect("node index should exist");
139
140 if !work_item.status.is_done() {
141 match worker.advance_work(work_item) {
142 Ok(()) => match &work_item.status {
143 WorkStatus::Done(result) => {
144 done_count += 1;
145 if result.is_ok() {
146 log::info!(
147 "successfully processed `{}`",
148 work_item.source().display()
149 );
150 }
151 }
152 WorkStatus::InProgress(progress) => {
153 for content in progress.required_content() {
154 if let Some(content_node_index) =
155 self.node_map.get(content)
156 {
157 add_edges.push((*content_node_index, node_index));
158 }
159 }
160 log::trace!(
161 "work on `{}` has not completed",
162 work_item.source().display()
163 );
164 }
165 WorkStatus::NotStarted => {}
166 },
167 Err(err) => {
168 log::error!(
169 "an error happened while processing {}: {}",
170 work_item.source().display(),
171 err
172 );
173 work_item.status = WorkStatus::err(err);
174 done_count += 1;
175 if options.should_fail_fast() {
176 log::debug!(
177 "dropping all work because the fail-fast option is enabled"
178 );
179 break 'work_loop;
180 }
181 }
182 }
183 }
184
185 for path in work_item.external_file_dependencies.iter() {
186 let container = self
187 .external_dependencies
188 .entry(path.to_path_buf())
189 .or_default();
190
191 if !container.contains(&node_index) {
192 log::trace!(
193 "link external dependency {} to {}",
194 path.display(),
195 work_item.source().display()
196 );
197 container.insert(node_index);
198 }
199 }
200 }
201
202 log::debug!("process batch of tasks ({}/{})", done_count, total_not_done);
203
204 if done_count == total_not_done {
205 break;
206 }
207 }
208 Err(_cycle_err) => {
209 return Err(DarkluaError::cyclic_work(
210 self.graph
211 .node_weights()
212 .filter(|item| !item.status.is_done())
213 .collect(),
214 ));
215 }
216 }
217
218 for (from, to) in add_edges {
219 self.graph.add_edge(from, to, ());
220 }
221 }
222
223 log::info!("executed work in {}", work_timer.duration_label());
224
225 Ok(())
226 }
227
228 pub fn result(self) -> Result<(), Vec<DarkluaError>> {
229 let errors: Vec<_> = self.iter_errors().cloned().collect();
230 if errors.is_empty() {
231 Ok(())
232 } else {
233 Err(errors)
234 }
235 }
236
237 pub fn collect_errors(&self) -> Vec<&DarkluaError> {
238 self.iter_errors().collect()
239 }
240
241 fn iter_errors(&self) -> impl Iterator<Item = &DarkluaError> {
242 self.graph
243 .node_weights()
244 .filter_map(|work_item| match &work_item.status {
245 WorkStatus::NotStarted | WorkStatus::InProgress(_) => None,
246 WorkStatus::Done(result) => result.as_ref().err(),
247 })
248 }
249
250 pub fn success_count(&self) -> usize {
251 self.graph
252 .node_weights()
253 .filter_map(|work_item| match &work_item.status {
254 WorkStatus::NotStarted | WorkStatus::InProgress(_) => None,
255 WorkStatus::Done(result) => result.as_ref().ok(),
256 })
257 .count()
258 }
259
260 pub fn iter_external_dependencies(&self) -> impl Iterator<Item = &Path> {
261 self.external_dependencies
262 .iter()
263 .filter_map(|(path, container)| (!container.is_empty()).then_some(path.as_path()))
264 }
265
266 pub fn reset(&mut self) {
267 self.graph.node_weights_mut().for_each(|work_item| {
268 work_item.reset();
269 });
270 self.external_dependencies.clear();
271 }
272
273 pub fn source_changed(&mut self, path: impl AsRef<Path>) {
274 let path = normalize_path(path.as_ref());
275
276 if let Some(node_index) = self.node_map.get(&path) {
277 self.restart_work(*node_index);
278 } else {
279 let node_indexes: Vec<_> = self
280 .node_map
281 .iter()
282 .filter_map(|(node_path, node_index)| {
283 node_path.starts_with(&path).then_some(*node_index)
284 })
285 .collect();
286
287 for node_index in node_indexes {
288 self.restart_work(node_index);
289 }
290 }
291
292 self.update_external_dependencies(&path);
293 }
294
295 fn update_external_dependencies(&mut self, path: &Path) {
296 let node_indexes = self
297 .external_dependencies
298 .get(path)
299 .map(|nodes| nodes.iter().copied().collect::<Vec<_>>())
300 .unwrap_or_default();
301
302 for index in node_indexes {
303 self.restart_work(index);
304 }
305 }
306
307 pub fn remove_source(&mut self, path: impl AsRef<Path>) {
308 let path = normalize_path(path.as_ref());
309
310 if let Some(node_index) = self.node_map.get(&path).copied() {
311 let root_item = self
312 .graph
313 .node_weight_mut(node_index)
314 .expect("node index should exist");
315
316 if !root_item.data.is_in_place() {
317 self.remove_files
318 .push(root_item.data.output().to_path_buf());
319 }
320
321 self.restart_work(node_index);
322
323 self.graph.remove_node(node_index);
324 self.node_map.remove(&path);
325 } else {
326 let mut remove_nodes = Vec::new();
327
328 self.node_map.retain(|node_path, node_index| {
329 if node_path.starts_with(&path) {
330 remove_nodes.push(*node_index);
331 false
332 } else {
333 true
334 }
335 });
336
337 for node_index in remove_nodes {
338 if let Some(work_item) = self.graph.remove_node(node_index) {
339 if !work_item.data.is_in_place() {
340 self.remove_files
341 .push(work_item.data.output().to_path_buf());
342 }
343 }
344 }
345 }
346
347 self.update_external_dependencies(&path);
348 }
349
350 pub fn contains(&mut self, path: impl AsRef<Path>) -> bool {
351 let path = normalize_path(path.as_ref());
352 self.node_map.contains_key(&path)
353 }
354
355 pub fn add_source(&mut self, path: impl AsRef<Path>, output: Option<PathBuf>) {
356 let path = normalize_path(path.as_ref());
357
358 self.update_external_dependencies(&path);
359
360 if let Some(node_index) = self.node_map.get(&path) {
361 self.restart_work(*node_index);
362 } else {
363 self.insert_source(path, output);
364 }
365 }
366
367 fn add_source_if_missing(&mut self, path: impl AsRef<Path>, output: Option<PathBuf>) {
368 let path = normalize_path(path.as_ref());
369
370 if !self.node_map.contains_key(&path) {
371 self.insert_source(path, output);
372 }
373 }
374
375 fn insert_source(&mut self, path: PathBuf, output: Option<PathBuf>) {
376 let node_index = self.graph.add_node(if let Some(output) = output {
377 WorkItem::new(path.clone(), output)
378 } else {
379 WorkItem::new_in_place(path.clone())
380 });
381 self.node_map.insert(path, node_index);
382 }
383
384 fn restart_work(&mut self, node_index: NodeIndex) {
385 let mut dfs = Dfs::new(&self.graph, node_index);
386
387 while let Some(dependent_node) = dfs.next(&self.graph) {
388 let item = self
389 .graph
390 .node_weight_mut(dependent_node)
391 .expect("node index should exist");
392
393 log::debug!("restart work for {}", item.source().display());
394 for path in item.external_file_dependencies.iter() {
395 if let Some(container) = self.external_dependencies.get_mut(path) {
396 container.remove(&node_index);
397 }
398 }
399 item.reset();
400 }
401 }
402
403 fn has_configuration_changed(&mut self, config: &Configuration) -> bool {
404 let input = serde_json::to_vec(config).ok().unwrap_or_default();
405
406 let new_hash = xxh3_64(&input);
407
408 let last_hash = self.last_configuration_hash.replace(new_hash);
409
410 last_hash
411 .map(|last_hash| new_hash != last_hash)
412 .unwrap_or_default()
413 }
414}