1use std::{
2 collections::{HashMap, HashSet},
3 path::{Path, PathBuf},
4};
5
6use petgraph::{algo::toposort, graph::NodeIndex, stable_graph::StableDiGraph, visit::Dfs};
7use xxhash_rust::xxh3::xxh3_64;
8
9use crate::{
10 frontend::utils::maybe_plural,
11 utils::{clear_luau_configuration_cache, Timer},
12 DarkluaError,
13};
14
15use super::{
16 normalize_path, work_item::WorkStatus, Configuration, DarkluaResult, Options, Resources,
17 WorkItem, Worker,
18};
19
20#[derive(Debug, Default)]
27pub struct WorkerTree {
28 graph: StableDiGraph<WorkItem, ()>,
29 node_map: HashMap<PathBuf, NodeIndex>,
30 external_dependencies: HashMap<PathBuf, HashSet<NodeIndex>>,
31 remove_files: Vec<PathBuf>,
32 last_configuration_hash: Option<u64>,
33}
34
35impl WorkerTree {
36 pub fn collect_work(&mut self, resources: &Resources, options: &Options) -> DarkluaResult<()> {
42 log::trace!("start collecting work");
43 let collect_work_timer = Timer::now();
44
45 if let Some(output) = options.output().map(Path::to_path_buf) {
46 if resources.is_file(options.input())? {
47 if resources.is_directory(&output)? {
48 let file_name = options.input().file_name().ok_or_else(|| {
49 DarkluaError::custom(format!(
50 "unable to extract file name from `{}`",
51 options.input().display()
52 ))
53 })?;
54
55 self.add_source_if_missing(options.input(), Some(output.join(file_name)));
56 } else if resources.is_file(&output)? || output.extension().is_some() {
57 self.add_source_if_missing(options.input(), Some(output));
58 } else {
59 let file_name = options.input().file_name().ok_or_else(|| {
60 DarkluaError::custom(format!(
61 "unable to extract file name from `{}`",
62 options.input().display()
63 ))
64 })?;
65
66 self.add_source_if_missing(options.input(), Some(output.join(file_name)));
67 }
68 } else {
69 let input = normalize_path(options.input());
70
71 for source in resources.collect_work(&input) {
72 let source = normalize_path(source);
73
74 let relative_path = source.strip_prefix(&input).map_err(|err| {
75 DarkluaError::custom(format!(
76 "unable to remove path prefix `{}` from `{}`: {}",
77 input.display(),
78 source.display(),
79 err
80 ))
81 })?;
82
83 let output_path = Some(output.join(relative_path));
84 self.add_source_if_missing(source, output_path);
85 }
86 }
87 } else {
88 let input = normalize_path(options.input());
89
90 for source in resources.collect_work(input) {
91 self.add_source_if_missing(source, None);
92 }
93 }
94
95 log::trace!("work collected in {}", collect_work_timer.duration_label());
96
97 Ok(())
98 }
99
100 pub fn process(&mut self, resources: &Resources, mut options: Options) -> DarkluaResult<()> {
105 clear_luau_configuration_cache();
106
107 if !self.remove_files.is_empty() {
108 let remove_count = self.remove_files.len();
109 log::debug!(
110 "clean {} file{} before beginning process",
111 remove_count,
112 maybe_plural(remove_count)
113 );
114 for path in self.remove_files.drain(..) {
115 log::trace!("remove file {}", path.display());
116 if let Err(err) = resources.remove(path).map_err(DarkluaError::from) {
117 log::warn!("failed to remove resource: {}", err);
118 }
119 }
120 }
121
122 let mut worker = Worker::new(resources);
123 worker.setup_worker(&mut options)?;
124
125 if self.has_configuration_changed(worker.configuration()) {
126 log::debug!("configuration change detected");
127 self.reset();
128 }
129
130 let total_not_done = self
131 .graph
132 .node_weights()
133 .filter(|work_item| !work_item.status.is_done())
134 .count();
135
136 if total_not_done == 0 {
137 return Ok(());
138 }
139
140 let work_timer = Timer::now();
141
142 'work_loop: loop {
143 let mut add_edges = Vec::new();
144
145 match toposort(&self.graph, None) {
146 Ok(node_indexes) => {
147 let mut done_count = 0;
148
149 for node_index in node_indexes {
150 let work_item = self
151 .graph
152 .node_weight_mut(node_index)
153 .expect("node index should exist");
154
155 if !work_item.status.is_done() {
156 match worker.advance_work(work_item) {
157 Ok(()) => match &work_item.status {
158 WorkStatus::Done(result) => {
159 done_count += 1;
160 if result.is_ok() {
161 log::info!(
162 "successfully processed `{}`",
163 work_item.source().display()
164 );
165 }
166 }
167 WorkStatus::InProgress(progress) => {
168 for content in progress.required_content() {
169 if let Some(content_node_index) =
170 self.node_map.get(content)
171 {
172 add_edges.push((*content_node_index, node_index));
173 }
174 }
175 log::trace!(
176 "work on `{}` has not completed",
177 work_item.source().display()
178 );
179 }
180 WorkStatus::NotStarted => {}
181 },
182 Err(err) => {
183 log::error!(
184 "an error happened while processing {}: {}",
185 work_item.source().display(),
186 err
187 );
188 work_item.status = WorkStatus::err(err);
189 done_count += 1;
190 if options.should_fail_fast() {
191 log::debug!(
192 "dropping all work because the fail-fast option is enabled"
193 );
194 break 'work_loop;
195 }
196 }
197 }
198 }
199
200 for path in work_item.external_file_dependencies.iter() {
201 let container = self
202 .external_dependencies
203 .entry(path.to_path_buf())
204 .or_default();
205
206 if !container.contains(&node_index) {
207 log::trace!(
208 "link external dependency {} to {}",
209 path.display(),
210 work_item.source().display()
211 );
212 container.insert(node_index);
213 }
214 }
215 }
216
217 log::debug!("process batch of tasks ({}/{})", done_count, total_not_done);
218
219 if done_count == total_not_done {
220 break;
221 }
222 }
223 Err(_cycle_err) => {
224 return Err(DarkluaError::cyclic_work(
225 self.graph
226 .node_weights()
227 .filter(|item| !item.status.is_done())
228 .collect(),
229 ));
230 }
231 }
232
233 for (from, to) in add_edges {
234 self.graph.add_edge(from, to, ());
235 }
236 }
237
238 log::info!("executed work in {}", work_timer.duration_label());
239
240 Ok(())
241 }
242
243 pub fn result(self) -> Result<(), Vec<DarkluaError>> {
248 let errors: Vec<_> = self.iter_errors().cloned().collect();
249 if errors.is_empty() {
250 Ok(())
251 } else {
252 Err(errors)
253 }
254 }
255
256 pub fn collect_errors(&self) -> Vec<&DarkluaError> {
258 self.iter_errors().collect()
259 }
260
261 fn iter_errors(&self) -> impl Iterator<Item = &DarkluaError> {
262 self.graph
263 .node_weights()
264 .filter_map(|work_item| match &work_item.status {
265 WorkStatus::NotStarted | WorkStatus::InProgress(_) => None,
266 WorkStatus::Done(result) => result.as_ref().err(),
267 })
268 }
269
270 pub fn success_count(&self) -> usize {
272 self.graph
273 .node_weights()
274 .filter_map(|work_item| match &work_item.status {
275 WorkStatus::NotStarted | WorkStatus::InProgress(_) => None,
276 WorkStatus::Done(result) => result.as_ref().ok(),
277 })
278 .count()
279 }
280
281 pub fn iter_external_dependencies(&self) -> impl Iterator<Item = &Path> {
283 self.external_dependencies
284 .iter()
285 .filter_map(|(path, container)| (!container.is_empty()).then_some(path.as_path()))
286 }
287
288 pub fn reset(&mut self) {
290 self.graph.node_weights_mut().for_each(|work_item| {
291 work_item.reset();
292 });
293 self.external_dependencies.clear();
294 }
295
296 pub fn source_changed(&mut self, path: impl AsRef<Path>) {
298 let path = normalize_path(path.as_ref());
299
300 if let Some(node_index) = self.node_map.get(&path) {
301 self.restart_work(*node_index);
302 } else {
303 let node_indexes: Vec<_> = self
304 .node_map
305 .iter()
306 .filter_map(|(node_path, node_index)| {
307 node_path.starts_with(&path).then_some(*node_index)
308 })
309 .collect();
310
311 for node_index in node_indexes {
312 self.restart_work(node_index);
313 }
314 }
315
316 self.update_external_dependencies(&path);
317 }
318
319 fn update_external_dependencies(&mut self, path: &Path) {
320 let node_indexes = self
321 .external_dependencies
322 .get(path)
323 .map(|nodes| nodes.iter().copied().collect::<Vec<_>>())
324 .unwrap_or_default();
325
326 for index in node_indexes {
327 self.restart_work(index);
328 }
329 }
330
331 pub fn remove_source(&mut self, path: impl AsRef<Path>) {
333 let path = normalize_path(path.as_ref());
334
335 if let Some(node_index) = self.node_map.get(&path).copied() {
336 let root_item = self
337 .graph
338 .node_weight_mut(node_index)
339 .expect("node index should exist");
340
341 if !root_item.data.is_in_place() {
342 self.remove_files
343 .push(root_item.data.output().to_path_buf());
344 }
345
346 self.restart_work(node_index);
347
348 self.graph.remove_node(node_index);
349 self.node_map.remove(&path);
350 } else {
351 let mut remove_nodes = Vec::new();
352
353 self.node_map.retain(|node_path, node_index| {
354 if node_path.starts_with(&path) {
355 remove_nodes.push(*node_index);
356 false
357 } else {
358 true
359 }
360 });
361
362 for node_index in remove_nodes {
363 if let Some(work_item) = self.graph.remove_node(node_index) {
364 if !work_item.data.is_in_place() {
365 self.remove_files
366 .push(work_item.data.output().to_path_buf());
367 }
368 }
369 }
370 }
371
372 self.update_external_dependencies(&path);
373 }
374
375 pub fn contains(&mut self, path: impl AsRef<Path>) -> bool {
377 let path = normalize_path(path.as_ref());
378 self.node_map.contains_key(&path)
379 }
380
381 pub fn add_source(&mut self, path: impl AsRef<Path>, output: Option<PathBuf>) {
383 let path = normalize_path(path.as_ref());
384
385 self.update_external_dependencies(&path);
386
387 if let Some(node_index) = self.node_map.get(&path) {
388 self.restart_work(*node_index);
389 } else {
390 self.insert_source(path, output);
391 }
392 }
393
394 fn add_source_if_missing(&mut self, path: impl AsRef<Path>, output: Option<PathBuf>) {
395 let path = normalize_path(path.as_ref());
396
397 if !self.node_map.contains_key(&path) {
398 self.insert_source(path, output);
399 }
400 }
401
402 fn insert_source(&mut self, path: PathBuf, output: Option<PathBuf>) {
403 let node_index = self.graph.add_node(if let Some(output) = output {
404 WorkItem::new(path.clone(), output)
405 } else {
406 WorkItem::new_in_place(path.clone())
407 });
408 self.node_map.insert(path, node_index);
409 }
410
411 fn restart_work(&mut self, node_index: NodeIndex) {
412 let mut dfs = Dfs::new(&self.graph, node_index);
413
414 while let Some(dependent_node) = dfs.next(&self.graph) {
415 let item = self
416 .graph
417 .node_weight_mut(dependent_node)
418 .expect("node index should exist");
419
420 log::debug!("restart work for {}", item.source().display());
421 for path in item.external_file_dependencies.iter() {
422 if let Some(container) = self.external_dependencies.get_mut(path) {
423 container.remove(&node_index);
424 }
425 }
426 item.reset();
427 }
428 }
429
430 fn has_configuration_changed(&mut self, config: &Configuration) -> bool {
431 let input = serde_json::to_vec(config).ok().unwrap_or_default();
432
433 let new_hash = xxh3_64(&input);
434
435 let last_hash = self.last_configuration_hash.replace(new_hash);
436
437 last_hash
438 .map(|last_hash| new_hash != last_hash)
439 .unwrap_or_default()
440 }
441}