Skip to main content

darklua_core/frontend/
worker_tree.rs

1use std::{
2    collections::{HashMap, HashSet},
3    path::{Path, PathBuf},
4};
5
6use petgraph::{algo::toposort, graph::NodeIndex, stable_graph::StableDiGraph, visit::Dfs};
7use xxhash_rust::xxh3::xxh3_64;
8
9use crate::{
10    frontend::utils::maybe_plural,
11    utils::{clear_luau_configuration_cache, Timer},
12    DarkluaError,
13};
14
15use super::{
16    normalize_path, resources::ResourceContent, work_item::WorkStatus, Configuration,
17    DarkluaResult, Options, Resources, WorkItem, Worker,
18};
19
20/// A structure that manages the processing of Lua/Luau files and their dependencies.
21///
22/// Under the hood, the `WorkerTree` maintains a directed graph of work items, where each node
23/// represents a file to be processed and edges represent dependencies between files. It handles
24/// the collection and processing of work items, manages file dependencies, and tracks the status
25/// of each work item.
26#[derive(Debug, Default)]
27pub struct WorkerTree {
28    graph: StableDiGraph<WorkItem, ()>,
29    node_map: HashMap<PathBuf, NodeIndex>,
30    external_dependencies: HashMap<PathBuf, HashSet<NodeIndex>>,
31    remove_files: Vec<PathBuf>,
32    last_configuration_hash: Option<u64>,
33    output_structure: Option<HashMap<PathBuf, bool>>,
34}
35
36impl WorkerTree {
37    /// Collects work items based on the provided resources and options.
38    ///
39    /// This method traverses the input directory or file specified in the options and
40    /// creates work items for each Lua/Luau file that needs to be processed. It also sets up
41    /// the output paths based on the provided options.
42    pub fn collect_work(&mut self, resources: &Resources, options: &Options) -> DarkluaResult<()> {
43        log::trace!("start collecting work");
44        let collect_work_timer = Timer::now();
45
46        if let Some(output) = options.output().map(Path::to_path_buf) {
47            if resources.is_file(options.input())? {
48                if resources.is_directory(&output)? {
49                    let file_name = options.input().file_name().ok_or_else(|| {
50                        DarkluaError::custom(format!(
51                            "unable to extract file name from `{}`",
52                            options.input().display()
53                        ))
54                    })?;
55
56                    self.add_source_if_missing(options.input(), Some(output.join(file_name)));
57                } else if resources.is_file(&output)? || output.extension().is_some() {
58                    self.add_source_if_missing(options.input(), Some(output));
59                } else {
60                    let file_name = options.input().file_name().ok_or_else(|| {
61                        DarkluaError::custom(format!(
62                            "unable to extract file name from `{}`",
63                            options.input().display()
64                        ))
65                    })?;
66
67                    self.add_source_if_missing(options.input(), Some(output.join(file_name)));
68                }
69            } else {
70                let input = normalize_path(options.input());
71
72                for source in resources.walk(&input) {
73                    let source = normalize_path(source);
74
75                    let relative_path = source.strip_prefix(&input).map_err(|err| {
76                        DarkluaError::custom(format!(
77                            "unable to remove path prefix `{}` from `{}`: {}",
78                            input.display(),
79                            source.display(),
80                            err
81                        ))
82                    })?;
83
84                    let output_path = Some(output.join(relative_path));
85                    self.add_source_if_missing(source, output_path);
86                }
87            }
88        } else {
89            let input = normalize_path(options.input());
90
91            for source in resources.walk(input) {
92                self.add_source_if_missing(source, None);
93            }
94        }
95
96        log::trace!("work collected in {}", collect_work_timer.duration_label());
97
98        Ok(())
99    }
100
101    /// Processes all collected work items according to the provided options.
102    ///
103    /// This method performs the actual processing of work items in topological order,
104    /// respecting dependencies between files.
105    pub fn process(&mut self, resources: &Resources, mut options: Options) -> DarkluaResult<()> {
106        clear_luau_configuration_cache();
107
108        let mut worker = Worker::new(resources);
109        worker.setup_worker(&mut options)?;
110
111        if self.has_configuration_changed(worker.configuration()) {
112            log::debug!("configuration change detected");
113            self.reset();
114        }
115
116        let total_not_done = self
117            .graph
118            .node_weights()
119            .filter(|work_item| !work_item.status.is_done())
120            .count();
121
122        if total_not_done == 0 {
123            return Ok(());
124        }
125
126        let work_timer = Timer::now();
127
128        'work_loop: loop {
129            let mut add_edges = Vec::new();
130
131            match toposort(&self.graph, None) {
132                Ok(node_indexes) => {
133                    let mut done_count = 0;
134
135                    for node_index in node_indexes {
136                        let work_item = self
137                            .graph
138                            .node_weight_mut(node_index)
139                            .expect("node index should exist");
140
141                        if !work_item.status.is_done() {
142                            match worker.advance_work(work_item) {
143                                Ok(()) => match &work_item.status {
144                                    WorkStatus::Done(result) => {
145                                        done_count += 1;
146                                        if result.is_ok() {
147                                            log::info!(
148                                                "successfully processed `{}`",
149                                                work_item.source().display()
150                                            );
151                                        }
152                                    }
153                                    WorkStatus::InProgress(progress) => {
154                                        for content in progress.required_content() {
155                                            if let Some(content_node_index) =
156                                                self.node_map.get(content)
157                                            {
158                                                add_edges.push((*content_node_index, node_index));
159                                            }
160                                        }
161                                        log::trace!(
162                                            "work on `{}` has not completed",
163                                            work_item.source().display()
164                                        );
165                                    }
166                                    WorkStatus::NotStarted => {}
167                                },
168                                Err(err) => {
169                                    log::error!(
170                                        "an error happened while processing {}: {}",
171                                        work_item.source().display(),
172                                        err
173                                    );
174                                    work_item.status = WorkStatus::err(err);
175                                    done_count += 1;
176                                    if options.should_fail_fast() {
177                                        log::debug!(
178                                            "dropping all work because the fail-fast option is enabled"
179                                        );
180                                        break 'work_loop;
181                                    }
182                                }
183                            }
184                        }
185
186                        for path in work_item.external_file_dependencies.iter() {
187                            let container = self
188                                .external_dependencies
189                                .entry(path.to_path_buf())
190                                .or_default();
191
192                            if !container.contains(&node_index) {
193                                log::trace!(
194                                    "link external dependency {} to {}",
195                                    path.display(),
196                                    work_item.source().display()
197                                );
198                                container.insert(node_index);
199                            }
200                        }
201                    }
202
203                    log::debug!("process batch of tasks ({}/{})", done_count, total_not_done);
204
205                    if done_count == total_not_done {
206                        break;
207                    }
208                }
209                Err(_cycle_err) => {
210                    return Err(DarkluaError::cyclic_work(
211                        self.graph
212                            .node_weights()
213                            .filter(|item| !item.status.is_done())
214                            .collect(),
215                    ));
216                }
217            }
218
219            for (from, to) in add_edges {
220                self.graph.add_edge(from, to, ());
221            }
222        }
223
224        log::info!("executed work in {}", work_timer.duration_label());
225
226        self.clean_files(resources);
227
228        Ok(())
229    }
230
231    fn clean_files(&mut self, resources: &Resources) {
232        if self.remove_files.is_empty() {
233            return;
234        }
235
236        let remove_count = self.remove_files.len();
237        log::debug!("clean {} file{}", remove_count, maybe_plural(remove_count));
238
239        for path in self.remove_files.drain(..) {
240            log::trace!("remove file {}", path.display());
241            if let Err(err) = resources.remove(&path).map_err(DarkluaError::from) {
242                log::warn!("failed to remove resource: {}", err);
243            } else if let Some(structure) = self.output_structure.as_ref() {
244                for ancestor in path.ancestors().skip(1) {
245                    if ancestor.components().count() == 0 {
246                        break;
247                    }
248                    if !structure.contains_key(ancestor)
249                        && resources.is_empty_directory(ancestor).unwrap_or_default()
250                    {
251                        if let Err(err) = resources.remove(ancestor).map_err(DarkluaError::from) {
252                            log::warn!("failed to remove resource: {}", err);
253                        }
254                    } else {
255                        break;
256                    }
257                }
258            }
259        }
260    }
261
262    /// Returns the final result of processing all work items.
263    ///
264    /// This method consumes the `WorkerTree` and returns either Ok(()) if all work items
265    /// were processed successfully, or a vector of errors if any work items failed.
266    pub fn result(self) -> Result<(), Vec<DarkluaError>> {
267        let errors: Vec<_> = self.iter_errors().cloned().collect();
268        if errors.is_empty() {
269            Ok(())
270        } else {
271            Err(errors)
272        }
273    }
274
275    /// Collects all errors that occurred during processing.
276    pub fn collect_errors(&self) -> Vec<&DarkluaError> {
277        self.iter_errors().collect()
278    }
279
280    fn iter_errors(&self) -> impl Iterator<Item = &DarkluaError> {
281        self.graph
282            .node_weights()
283            .filter_map(|work_item| match &work_item.status {
284                WorkStatus::NotStarted | WorkStatus::InProgress(_) => None,
285                WorkStatus::Done(result) => result.as_ref().err(),
286            })
287    }
288
289    /// Returns the number of successfully processed work items.
290    pub fn success_count(&self) -> usize {
291        self.graph
292            .node_weights()
293            .filter_map(|work_item| match &work_item.status {
294                WorkStatus::NotStarted | WorkStatus::InProgress(_) => None,
295                WorkStatus::Done(result) => result.as_ref().ok(),
296            })
297            .count()
298    }
299
300    /// Returns an iterator over all external dependencies.
301    pub fn iter_external_dependencies(&self) -> impl Iterator<Item = &Path> {
302        self.external_dependencies
303            .iter()
304            .filter_map(|(path, container)| (!container.is_empty()).then_some(path.as_path()))
305    }
306
307    /// Resets the worker tree to its initial state.
308    pub fn reset(&mut self) {
309        self.graph.node_weights_mut().for_each(|work_item| {
310            work_item.reset();
311        });
312        self.external_dependencies.clear();
313    }
314
315    /// Notifies the worker tree that a source file has changed.
316    pub fn source_changed(&mut self, path: impl AsRef<Path>) {
317        let path = normalize_path(path.as_ref());
318
319        if let Some(node_index) = self.node_map.get(&path) {
320            self.restart_work(*node_index);
321        } else {
322            let node_indexes: Vec<_> = self
323                .node_map
324                .iter()
325                .filter_map(|(node_path, node_index)| {
326                    node_path.starts_with(&path).then_some(*node_index)
327                })
328                .collect();
329
330            for node_index in node_indexes {
331                self.restart_work(node_index);
332            }
333        }
334
335        self.update_external_dependencies(&path);
336    }
337
338    fn update_external_dependencies(&mut self, path: &Path) {
339        let node_indexes = self
340            .external_dependencies
341            .get(path)
342            .map(|nodes| nodes.iter().copied().collect::<Vec<_>>())
343            .unwrap_or_default();
344
345        for index in node_indexes {
346            self.restart_work(index);
347        }
348    }
349
350    /// Removes a source file from the worker tree.
351    pub fn remove_source(&mut self, path: impl AsRef<Path>) {
352        let path = normalize_path(path.as_ref());
353
354        if let Some(node_index) = self.node_map.get(&path).copied() {
355            let root_item = self
356                .graph
357                .node_weight_mut(node_index)
358                .expect("node index should exist");
359
360            if !root_item.data.is_in_place() {
361                self.remove_files
362                    .push(root_item.data.output().to_path_buf());
363            }
364
365            self.restart_work(node_index);
366
367            self.graph.remove_node(node_index);
368            self.node_map.remove(&path);
369        } else {
370            let mut remove_nodes = Vec::new();
371
372            self.node_map.retain(|node_path, node_index| {
373                if node_path.starts_with(&path) {
374                    remove_nodes.push(*node_index);
375                    false
376                } else {
377                    true
378                }
379            });
380
381            for node_index in remove_nodes {
382                if let Some(work_item) = self.graph.remove_node(node_index) {
383                    if !work_item.data.is_in_place() {
384                        self.remove_files
385                            .push(work_item.data.output().to_path_buf());
386                    }
387                }
388            }
389        }
390
391        self.update_external_dependencies(&path);
392    }
393
394    /// Checks if a source file is present in the worker tree.
395    pub fn contains(&mut self, path: impl AsRef<Path>) -> bool {
396        let path = normalize_path(path.as_ref());
397        self.node_map.contains_key(&path)
398    }
399
400    /// Adds a source file to the worker tree.
401    pub fn add_source(&mut self, path: impl AsRef<Path>, output: Option<PathBuf>) {
402        let path = normalize_path(path.as_ref());
403
404        self.update_external_dependencies(&path);
405
406        if let Some(node_index) = self.node_map.get(&path) {
407            self.restart_work(*node_index);
408        } else {
409            self.insert_source(path, output);
410        }
411    }
412
413    fn add_source_if_missing(&mut self, path: impl AsRef<Path>, output: Option<PathBuf>) {
414        let path = normalize_path(path.as_ref());
415
416        if !self.node_map.contains_key(&path) {
417            self.insert_source(path, output);
418        }
419    }
420
421    fn insert_source(&mut self, path: PathBuf, output: Option<PathBuf>) {
422        let node_index = self.graph.add_node(if let Some(output) = output {
423            WorkItem::new(path.clone(), output)
424        } else {
425            WorkItem::new_in_place(path.clone())
426        });
427        self.node_map.insert(path, node_index);
428    }
429
430    fn restart_work(&mut self, node_index: NodeIndex) {
431        let mut dfs = Dfs::new(&self.graph, node_index);
432
433        while let Some(dependent_node) = dfs.next(&self.graph) {
434            let item = self
435                .graph
436                .node_weight_mut(dependent_node)
437                .expect("node index should exist");
438
439            log::debug!("restart work for {}", item.source().display());
440            for path in item.external_file_dependencies.iter() {
441                if let Some(container) = self.external_dependencies.get_mut(path) {
442                    container.remove(&node_index);
443                }
444            }
445            item.reset();
446        }
447    }
448
449    fn has_configuration_changed(&mut self, config: &Configuration) -> bool {
450        let input = serde_json::to_vec(config).ok().unwrap_or_default();
451
452        let new_hash = xxh3_64(&input);
453
454        let last_hash = self.last_configuration_hash.replace(new_hash);
455
456        last_hash
457            .map(|last_hash| new_hash != last_hash)
458            .unwrap_or_default()
459    }
460
461    pub(crate) fn snapshot_output_structure(
462        &mut self,
463        resources: &Resources,
464        location: &Path,
465    ) -> DarkluaResult<()> {
466        if self.output_structure.is_some()
467            || !resources.exists(location)?
468            || !resources.is_directory(location)?
469        {
470            return Ok(());
471        }
472
473        let structure = resources
474            .walk_all(location)
475            .map(|content| match content {
476                ResourceContent::File(path) => (path, true),
477                ResourceContent::Directory(path) => (path, false),
478            })
479            .collect();
480
481        self.output_structure = Some(structure);
482
483        Ok(())
484    }
485}