darklua_core/frontend/
worker_tree.rs

1use std::{
2    collections::{HashMap, HashSet},
3    path::{Path, PathBuf},
4};
5
6use petgraph::{algo::toposort, graph::NodeIndex, stable_graph::StableDiGraph, visit::Dfs};
7use xxhash_rust::xxh3::xxh3_64;
8
9use crate::{
10    frontend::utils::maybe_plural,
11    utils::{clear_luau_configuration_cache, Timer},
12    DarkluaError,
13};
14
15use super::{
16    normalize_path, work_item::WorkStatus, Configuration, DarkluaResult, Options, Resources,
17    WorkItem, Worker,
18};
19
20#[derive(Debug, Default)]
21pub struct WorkerTree {
22    graph: StableDiGraph<WorkItem, ()>,
23    node_map: HashMap<PathBuf, NodeIndex>,
24    external_dependencies: HashMap<PathBuf, HashSet<NodeIndex>>,
25    remove_files: Vec<PathBuf>,
26    last_configuration_hash: Option<u64>,
27}
28
29impl WorkerTree {
30    pub fn collect_work(&mut self, resources: &Resources, options: &Options) -> DarkluaResult<()> {
31        log::trace!("start collecting work");
32        let collect_work_timer = Timer::now();
33
34        if let Some(output) = options.output().map(Path::to_path_buf) {
35            if resources.is_file(options.input())? {
36                if resources.is_directory(&output)? {
37                    let file_name = options.input().file_name().ok_or_else(|| {
38                        DarkluaError::custom(format!(
39                            "unable to extract file name from `{}`",
40                            options.input().display()
41                        ))
42                    })?;
43
44                    self.add_source_if_missing(options.input(), Some(output.join(file_name)));
45                } else if resources.is_file(&output)? || output.extension().is_some() {
46                    self.add_source_if_missing(options.input(), Some(output));
47                } else {
48                    let file_name = options.input().file_name().ok_or_else(|| {
49                        DarkluaError::custom(format!(
50                            "unable to extract file name from `{}`",
51                            options.input().display()
52                        ))
53                    })?;
54
55                    self.add_source_if_missing(options.input(), Some(output.join(file_name)));
56                }
57            } else {
58                let input = options.input().to_path_buf();
59
60                for source in resources.collect_work(&input) {
61                    let source = normalize_path(source);
62
63                    let relative_path = source.strip_prefix(&input).map_err(|err| {
64                        DarkluaError::custom(format!(
65                            "unable to remove path prefix `{}` from `{}`: {}",
66                            input.display(),
67                            source.display(),
68                            err
69                        ))
70                    })?;
71
72                    let output_path = Some(output.join(relative_path));
73                    self.add_source_if_missing(source, output_path);
74                }
75            }
76        } else {
77            let input = options.input().to_path_buf();
78
79            for source in resources.collect_work(input) {
80                self.add_source_if_missing(source, None);
81            }
82        }
83
84        log::trace!("work collected in {}", collect_work_timer.duration_label());
85
86        Ok(())
87    }
88
89    pub fn process(&mut self, resources: &Resources, mut options: Options) -> DarkluaResult<()> {
90        clear_luau_configuration_cache();
91
92        if !self.remove_files.is_empty() {
93            let remove_count = self.remove_files.len();
94            log::debug!(
95                "clean {} file{} before beginning process",
96                remove_count,
97                maybe_plural(remove_count)
98            );
99            for path in self.remove_files.drain(..) {
100                log::trace!("remove file {}", path.display());
101                if let Err(err) = resources.remove(path).map_err(DarkluaError::from) {
102                    log::warn!("failed to remove resource: {}", err);
103                }
104            }
105        }
106
107        let mut worker = Worker::new(resources);
108        worker.setup_worker(&mut options)?;
109
110        if self.has_configuration_changed(worker.configuration()) {
111            log::debug!("configuration change detected");
112            self.reset();
113        }
114
115        let total_not_done = self
116            .graph
117            .node_weights()
118            .filter(|work_item| !work_item.status.is_done())
119            .count();
120
121        if total_not_done == 0 {
122            return Ok(());
123        }
124
125        let work_timer = Timer::now();
126
127        'work_loop: loop {
128            let mut add_edges = Vec::new();
129
130            match toposort(&self.graph, None) {
131                Ok(node_indexes) => {
132                    let mut done_count = 0;
133
134                    for node_index in node_indexes {
135                        let work_item = self
136                            .graph
137                            .node_weight_mut(node_index)
138                            .expect("node index should exist");
139
140                        if !work_item.status.is_done() {
141                            match worker.advance_work(work_item) {
142                                Ok(()) => match &work_item.status {
143                                    WorkStatus::Done(result) => {
144                                        done_count += 1;
145                                        if result.is_ok() {
146                                            log::info!(
147                                                "successfully processed `{}`",
148                                                work_item.source().display()
149                                            );
150                                        }
151                                    }
152                                    WorkStatus::InProgress(progress) => {
153                                        for content in progress.required_content() {
154                                            if let Some(content_node_index) =
155                                                self.node_map.get(content)
156                                            {
157                                                add_edges.push((*content_node_index, node_index));
158                                            }
159                                        }
160                                        log::trace!(
161                                            "work on `{}` has not completed",
162                                            work_item.source().display()
163                                        );
164                                    }
165                                    WorkStatus::NotStarted => {}
166                                },
167                                Err(err) => {
168                                    log::error!(
169                                        "an error happened while processing {}: {}",
170                                        work_item.source().display(),
171                                        err
172                                    );
173                                    work_item.status = WorkStatus::err(err);
174                                    done_count += 1;
175                                    if options.should_fail_fast() {
176                                        log::debug!(
177                                            "dropping all work because the fail-fast option is enabled"
178                                        );
179                                        break 'work_loop;
180                                    }
181                                }
182                            }
183                        }
184
185                        for path in work_item.external_file_dependencies.iter() {
186                            let container = self
187                                .external_dependencies
188                                .entry(path.to_path_buf())
189                                .or_default();
190
191                            if !container.contains(&node_index) {
192                                log::trace!(
193                                    "link external dependency {} to {}",
194                                    path.display(),
195                                    work_item.source().display()
196                                );
197                                container.insert(node_index);
198                            }
199                        }
200                    }
201
202                    log::debug!("process batch of tasks ({}/{})", done_count, total_not_done);
203
204                    if done_count == total_not_done {
205                        break;
206                    }
207                }
208                Err(_cycle_err) => {
209                    return Err(DarkluaError::cyclic_work(
210                        self.graph
211                            .node_weights()
212                            .filter(|item| !item.status.is_done())
213                            .collect(),
214                    ));
215                }
216            }
217
218            for (from, to) in add_edges {
219                self.graph.add_edge(from, to, ());
220            }
221        }
222
223        log::info!("executed work in {}", work_timer.duration_label());
224
225        Ok(())
226    }
227
228    pub fn result(self) -> Result<(), Vec<DarkluaError>> {
229        let errors: Vec<_> = self.iter_errors().cloned().collect();
230        if errors.is_empty() {
231            Ok(())
232        } else {
233            Err(errors)
234        }
235    }
236
237    pub fn collect_errors(&self) -> Vec<&DarkluaError> {
238        self.iter_errors().collect()
239    }
240
241    fn iter_errors(&self) -> impl Iterator<Item = &DarkluaError> {
242        self.graph
243            .node_weights()
244            .filter_map(|work_item| match &work_item.status {
245                WorkStatus::NotStarted | WorkStatus::InProgress(_) => None,
246                WorkStatus::Done(result) => result.as_ref().err(),
247            })
248    }
249
250    pub fn success_count(&self) -> usize {
251        self.graph
252            .node_weights()
253            .filter_map(|work_item| match &work_item.status {
254                WorkStatus::NotStarted | WorkStatus::InProgress(_) => None,
255                WorkStatus::Done(result) => result.as_ref().ok(),
256            })
257            .count()
258    }
259
260    pub fn iter_external_dependencies(&self) -> impl Iterator<Item = &Path> {
261        self.external_dependencies
262            .iter()
263            .filter_map(|(path, container)| (!container.is_empty()).then_some(path.as_path()))
264    }
265
266    pub fn reset(&mut self) {
267        self.graph.node_weights_mut().for_each(|work_item| {
268            work_item.reset();
269        });
270        self.external_dependencies.clear();
271    }
272
273    pub fn source_changed(&mut self, path: impl AsRef<Path>) {
274        let path = normalize_path(path.as_ref());
275
276        if let Some(node_index) = self.node_map.get(&path) {
277            self.restart_work(*node_index);
278        } else {
279            let node_indexes: Vec<_> = self
280                .node_map
281                .iter()
282                .filter_map(|(node_path, node_index)| {
283                    node_path.starts_with(&path).then_some(*node_index)
284                })
285                .collect();
286
287            for node_index in node_indexes {
288                self.restart_work(node_index);
289            }
290        }
291
292        self.update_external_dependencies(&path);
293    }
294
295    fn update_external_dependencies(&mut self, path: &Path) {
296        let node_indexes = self
297            .external_dependencies
298            .get(path)
299            .map(|nodes| nodes.iter().copied().collect::<Vec<_>>())
300            .unwrap_or_default();
301
302        for index in node_indexes {
303            self.restart_work(index);
304        }
305    }
306
307    pub fn remove_source(&mut self, path: impl AsRef<Path>) {
308        let path = normalize_path(path.as_ref());
309
310        if let Some(node_index) = self.node_map.get(&path).copied() {
311            let root_item = self
312                .graph
313                .node_weight_mut(node_index)
314                .expect("node index should exist");
315
316            if !root_item.data.is_in_place() {
317                self.remove_files
318                    .push(root_item.data.output().to_path_buf());
319            }
320
321            self.restart_work(node_index);
322
323            self.graph.remove_node(node_index);
324            self.node_map.remove(&path);
325        } else {
326            let mut remove_nodes = Vec::new();
327
328            self.node_map.retain(|node_path, node_index| {
329                if node_path.starts_with(&path) {
330                    remove_nodes.push(*node_index);
331                    false
332                } else {
333                    true
334                }
335            });
336
337            for node_index in remove_nodes {
338                if let Some(work_item) = self.graph.remove_node(node_index) {
339                    if !work_item.data.is_in_place() {
340                        self.remove_files
341                            .push(work_item.data.output().to_path_buf());
342                    }
343                }
344            }
345        }
346
347        self.update_external_dependencies(&path);
348    }
349
350    pub fn contains(&mut self, path: impl AsRef<Path>) -> bool {
351        let path = normalize_path(path.as_ref());
352        self.node_map.contains_key(&path)
353    }
354
355    pub fn add_source(&mut self, path: impl AsRef<Path>, output: Option<PathBuf>) {
356        let path = normalize_path(path.as_ref());
357
358        self.update_external_dependencies(&path);
359
360        if let Some(node_index) = self.node_map.get(&path) {
361            self.restart_work(*node_index);
362        } else {
363            self.insert_source(path, output);
364        }
365    }
366
367    fn add_source_if_missing(&mut self, path: impl AsRef<Path>, output: Option<PathBuf>) {
368        let path = normalize_path(path.as_ref());
369
370        if !self.node_map.contains_key(&path) {
371            self.insert_source(path, output);
372        }
373    }
374
375    fn insert_source(&mut self, path: PathBuf, output: Option<PathBuf>) {
376        let node_index = self.graph.add_node(if let Some(output) = output {
377            WorkItem::new(path.clone(), output)
378        } else {
379            WorkItem::new_in_place(path.clone())
380        });
381        self.node_map.insert(path, node_index);
382    }
383
384    fn restart_work(&mut self, node_index: NodeIndex) {
385        let mut dfs = Dfs::new(&self.graph, node_index);
386
387        while let Some(dependent_node) = dfs.next(&self.graph) {
388            let item = self
389                .graph
390                .node_weight_mut(dependent_node)
391                .expect("node index should exist");
392
393            log::debug!("restart work for {}", item.source().display());
394            for path in item.external_file_dependencies.iter() {
395                if let Some(container) = self.external_dependencies.get_mut(path) {
396                    container.remove(&node_index);
397                }
398            }
399            item.reset();
400        }
401    }
402
403    fn has_configuration_changed(&mut self, config: &Configuration) -> bool {
404        let input = serde_json::to_vec(config).ok().unwrap_or_default();
405
406        let new_hash = xxh3_64(&input);
407
408        let last_hash = self.last_configuration_hash.replace(new_hash);
409
410        last_hash
411            .map(|last_hash| new_hash != last_hash)
412            .unwrap_or_default()
413    }
414}