darklua_core/frontend/
worker_tree.rs

1use std::{
2    collections::{HashMap, HashSet},
3    path::{Path, PathBuf},
4};
5
6use petgraph::{algo::toposort, graph::NodeIndex, stable_graph::StableDiGraph, visit::Dfs};
7use xxhash_rust::xxh3::xxh3_64;
8
9use crate::{
10    frontend::utils::maybe_plural,
11    utils::{clear_luau_configuration_cache, Timer},
12    DarkluaError,
13};
14
15use super::{
16    normalize_path, work_item::WorkStatus, Configuration, DarkluaResult, Options, Resources,
17    WorkItem, Worker,
18};
19
20/// A structure that manages the processing of Lua/Luau files and their dependencies.
21///
22/// Under the hood, the `WorkerTree` maintains a directed graph of work items, where each node
23/// represents a file to be processed and edges represent dependencies between files. It handles
24/// the collection and processing of work items, manages file dependencies, and tracks the status
25/// of each work item.
26#[derive(Debug, Default)]
27pub struct WorkerTree {
28    graph: StableDiGraph<WorkItem, ()>,
29    node_map: HashMap<PathBuf, NodeIndex>,
30    external_dependencies: HashMap<PathBuf, HashSet<NodeIndex>>,
31    remove_files: Vec<PathBuf>,
32    last_configuration_hash: Option<u64>,
33}
34
35impl WorkerTree {
36    /// Collects work items based on the provided resources and options.
37    ///
38    /// This method traverses the input directory or file specified in the options and
39    /// creates work items for each Lua/Luau file that needs to be processed. It also sets up
40    /// the output paths based on the provided options.
41    pub fn collect_work(&mut self, resources: &Resources, options: &Options) -> DarkluaResult<()> {
42        log::trace!("start collecting work");
43        let collect_work_timer = Timer::now();
44
45        if let Some(output) = options.output().map(Path::to_path_buf) {
46            if resources.is_file(options.input())? {
47                if resources.is_directory(&output)? {
48                    let file_name = options.input().file_name().ok_or_else(|| {
49                        DarkluaError::custom(format!(
50                            "unable to extract file name from `{}`",
51                            options.input().display()
52                        ))
53                    })?;
54
55                    self.add_source_if_missing(options.input(), Some(output.join(file_name)));
56                } else if resources.is_file(&output)? || output.extension().is_some() {
57                    self.add_source_if_missing(options.input(), Some(output));
58                } else {
59                    let file_name = options.input().file_name().ok_or_else(|| {
60                        DarkluaError::custom(format!(
61                            "unable to extract file name from `{}`",
62                            options.input().display()
63                        ))
64                    })?;
65
66                    self.add_source_if_missing(options.input(), Some(output.join(file_name)));
67                }
68            } else {
69                let input = normalize_path(options.input());
70
71                for source in resources.collect_work(&input) {
72                    let source = normalize_path(source);
73
74                    let relative_path = source.strip_prefix(&input).map_err(|err| {
75                        DarkluaError::custom(format!(
76                            "unable to remove path prefix `{}` from `{}`: {}",
77                            input.display(),
78                            source.display(),
79                            err
80                        ))
81                    })?;
82
83                    let output_path = Some(output.join(relative_path));
84                    self.add_source_if_missing(source, output_path);
85                }
86            }
87        } else {
88            let input = normalize_path(options.input());
89
90            for source in resources.collect_work(input) {
91                self.add_source_if_missing(source, None);
92            }
93        }
94
95        log::trace!("work collected in {}", collect_work_timer.duration_label());
96
97        Ok(())
98    }
99
100    /// Processes all collected work items according to the provided options.
101    ///
102    /// This method performs the actual processing of work items in topological order,
103    /// respecting dependencies between files.
104    pub fn process(&mut self, resources: &Resources, mut options: Options) -> DarkluaResult<()> {
105        clear_luau_configuration_cache();
106
107        if !self.remove_files.is_empty() {
108            let remove_count = self.remove_files.len();
109            log::debug!(
110                "clean {} file{} before beginning process",
111                remove_count,
112                maybe_plural(remove_count)
113            );
114            for path in self.remove_files.drain(..) {
115                log::trace!("remove file {}", path.display());
116                if let Err(err) = resources.remove(path).map_err(DarkluaError::from) {
117                    log::warn!("failed to remove resource: {}", err);
118                }
119            }
120        }
121
122        let mut worker = Worker::new(resources);
123        worker.setup_worker(&mut options)?;
124
125        if self.has_configuration_changed(worker.configuration()) {
126            log::debug!("configuration change detected");
127            self.reset();
128        }
129
130        let total_not_done = self
131            .graph
132            .node_weights()
133            .filter(|work_item| !work_item.status.is_done())
134            .count();
135
136        if total_not_done == 0 {
137            return Ok(());
138        }
139
140        let work_timer = Timer::now();
141
142        'work_loop: loop {
143            let mut add_edges = Vec::new();
144
145            match toposort(&self.graph, None) {
146                Ok(node_indexes) => {
147                    let mut done_count = 0;
148
149                    for node_index in node_indexes {
150                        let work_item = self
151                            .graph
152                            .node_weight_mut(node_index)
153                            .expect("node index should exist");
154
155                        if !work_item.status.is_done() {
156                            match worker.advance_work(work_item) {
157                                Ok(()) => match &work_item.status {
158                                    WorkStatus::Done(result) => {
159                                        done_count += 1;
160                                        if result.is_ok() {
161                                            log::info!(
162                                                "successfully processed `{}`",
163                                                work_item.source().display()
164                                            );
165                                        }
166                                    }
167                                    WorkStatus::InProgress(progress) => {
168                                        for content in progress.required_content() {
169                                            if let Some(content_node_index) =
170                                                self.node_map.get(content)
171                                            {
172                                                add_edges.push((*content_node_index, node_index));
173                                            }
174                                        }
175                                        log::trace!(
176                                            "work on `{}` has not completed",
177                                            work_item.source().display()
178                                        );
179                                    }
180                                    WorkStatus::NotStarted => {}
181                                },
182                                Err(err) => {
183                                    log::error!(
184                                        "an error happened while processing {}: {}",
185                                        work_item.source().display(),
186                                        err
187                                    );
188                                    work_item.status = WorkStatus::err(err);
189                                    done_count += 1;
190                                    if options.should_fail_fast() {
191                                        log::debug!(
192                                            "dropping all work because the fail-fast option is enabled"
193                                        );
194                                        break 'work_loop;
195                                    }
196                                }
197                            }
198                        }
199
200                        for path in work_item.external_file_dependencies.iter() {
201                            let container = self
202                                .external_dependencies
203                                .entry(path.to_path_buf())
204                                .or_default();
205
206                            if !container.contains(&node_index) {
207                                log::trace!(
208                                    "link external dependency {} to {}",
209                                    path.display(),
210                                    work_item.source().display()
211                                );
212                                container.insert(node_index);
213                            }
214                        }
215                    }
216
217                    log::debug!("process batch of tasks ({}/{})", done_count, total_not_done);
218
219                    if done_count == total_not_done {
220                        break;
221                    }
222                }
223                Err(_cycle_err) => {
224                    return Err(DarkluaError::cyclic_work(
225                        self.graph
226                            .node_weights()
227                            .filter(|item| !item.status.is_done())
228                            .collect(),
229                    ));
230                }
231            }
232
233            for (from, to) in add_edges {
234                self.graph.add_edge(from, to, ());
235            }
236        }
237
238        log::info!("executed work in {}", work_timer.duration_label());
239
240        Ok(())
241    }
242
243    /// Returns the final result of processing all work items.
244    ///
245    /// This method consumes the `WorkerTree` and returns either Ok(()) if all work items
246    /// were processed successfully, or a vector of errors if any work items failed.
247    pub fn result(self) -> Result<(), Vec<DarkluaError>> {
248        let errors: Vec<_> = self.iter_errors().cloned().collect();
249        if errors.is_empty() {
250            Ok(())
251        } else {
252            Err(errors)
253        }
254    }
255
256    /// Collects all errors that occurred during processing.
257    pub fn collect_errors(&self) -> Vec<&DarkluaError> {
258        self.iter_errors().collect()
259    }
260
261    fn iter_errors(&self) -> impl Iterator<Item = &DarkluaError> {
262        self.graph
263            .node_weights()
264            .filter_map(|work_item| match &work_item.status {
265                WorkStatus::NotStarted | WorkStatus::InProgress(_) => None,
266                WorkStatus::Done(result) => result.as_ref().err(),
267            })
268    }
269
270    /// Returns the number of successfully processed work items.
271    pub fn success_count(&self) -> usize {
272        self.graph
273            .node_weights()
274            .filter_map(|work_item| match &work_item.status {
275                WorkStatus::NotStarted | WorkStatus::InProgress(_) => None,
276                WorkStatus::Done(result) => result.as_ref().ok(),
277            })
278            .count()
279    }
280
281    /// Returns an iterator over all external dependencies.
282    pub fn iter_external_dependencies(&self) -> impl Iterator<Item = &Path> {
283        self.external_dependencies
284            .iter()
285            .filter_map(|(path, container)| (!container.is_empty()).then_some(path.as_path()))
286    }
287
288    /// Resets the worker tree to its initial state.
289    pub fn reset(&mut self) {
290        self.graph.node_weights_mut().for_each(|work_item| {
291            work_item.reset();
292        });
293        self.external_dependencies.clear();
294    }
295
296    /// Notifies the worker tree that a source file has changed.
297    pub fn source_changed(&mut self, path: impl AsRef<Path>) {
298        let path = normalize_path(path.as_ref());
299
300        if let Some(node_index) = self.node_map.get(&path) {
301            self.restart_work(*node_index);
302        } else {
303            let node_indexes: Vec<_> = self
304                .node_map
305                .iter()
306                .filter_map(|(node_path, node_index)| {
307                    node_path.starts_with(&path).then_some(*node_index)
308                })
309                .collect();
310
311            for node_index in node_indexes {
312                self.restart_work(node_index);
313            }
314        }
315
316        self.update_external_dependencies(&path);
317    }
318
319    fn update_external_dependencies(&mut self, path: &Path) {
320        let node_indexes = self
321            .external_dependencies
322            .get(path)
323            .map(|nodes| nodes.iter().copied().collect::<Vec<_>>())
324            .unwrap_or_default();
325
326        for index in node_indexes {
327            self.restart_work(index);
328        }
329    }
330
331    /// Removes a source file from the worker tree.
332    pub fn remove_source(&mut self, path: impl AsRef<Path>) {
333        let path = normalize_path(path.as_ref());
334
335        if let Some(node_index) = self.node_map.get(&path).copied() {
336            let root_item = self
337                .graph
338                .node_weight_mut(node_index)
339                .expect("node index should exist");
340
341            if !root_item.data.is_in_place() {
342                self.remove_files
343                    .push(root_item.data.output().to_path_buf());
344            }
345
346            self.restart_work(node_index);
347
348            self.graph.remove_node(node_index);
349            self.node_map.remove(&path);
350        } else {
351            let mut remove_nodes = Vec::new();
352
353            self.node_map.retain(|node_path, node_index| {
354                if node_path.starts_with(&path) {
355                    remove_nodes.push(*node_index);
356                    false
357                } else {
358                    true
359                }
360            });
361
362            for node_index in remove_nodes {
363                if let Some(work_item) = self.graph.remove_node(node_index) {
364                    if !work_item.data.is_in_place() {
365                        self.remove_files
366                            .push(work_item.data.output().to_path_buf());
367                    }
368                }
369            }
370        }
371
372        self.update_external_dependencies(&path);
373    }
374
375    /// Checks if a source file is present in the worker tree.
376    pub fn contains(&mut self, path: impl AsRef<Path>) -> bool {
377        let path = normalize_path(path.as_ref());
378        self.node_map.contains_key(&path)
379    }
380
381    /// Adds a source file to the worker tree.
382    pub fn add_source(&mut self, path: impl AsRef<Path>, output: Option<PathBuf>) {
383        let path = normalize_path(path.as_ref());
384
385        self.update_external_dependencies(&path);
386
387        if let Some(node_index) = self.node_map.get(&path) {
388            self.restart_work(*node_index);
389        } else {
390            self.insert_source(path, output);
391        }
392    }
393
394    fn add_source_if_missing(&mut self, path: impl AsRef<Path>, output: Option<PathBuf>) {
395        let path = normalize_path(path.as_ref());
396
397        if !self.node_map.contains_key(&path) {
398            self.insert_source(path, output);
399        }
400    }
401
402    fn insert_source(&mut self, path: PathBuf, output: Option<PathBuf>) {
403        let node_index = self.graph.add_node(if let Some(output) = output {
404            WorkItem::new(path.clone(), output)
405        } else {
406            WorkItem::new_in_place(path.clone())
407        });
408        self.node_map.insert(path, node_index);
409    }
410
411    fn restart_work(&mut self, node_index: NodeIndex) {
412        let mut dfs = Dfs::new(&self.graph, node_index);
413
414        while let Some(dependent_node) = dfs.next(&self.graph) {
415            let item = self
416                .graph
417                .node_weight_mut(dependent_node)
418                .expect("node index should exist");
419
420            log::debug!("restart work for {}", item.source().display());
421            for path in item.external_file_dependencies.iter() {
422                if let Some(container) = self.external_dependencies.get_mut(path) {
423                    container.remove(&node_index);
424                }
425            }
426            item.reset();
427        }
428    }
429
430    fn has_configuration_changed(&mut self, config: &Configuration) -> bool {
431        let input = serde_json::to_vec(config).ok().unwrap_or_default();
432
433        let new_hash = xxh3_64(&input);
434
435        let last_hash = self.last_configuration_hash.replace(new_hash);
436
437        last_hash
438            .map(|last_hash| new_hash != last_hash)
439            .unwrap_or_default()
440    }
441}