json_eval_rs/topo_sort/common.rs
1/// Shared utilities for topological sorting
2
3use indexmap::{IndexMap, IndexSet};
4
5/// Compute parallel execution batches from sorted dependencies
6///
7/// Groups evaluations into batches where items in the same batch can be evaluated in parallel.
8/// Each batch depends only on items from previous batches.
9///
10/// Handles table column dependencies by mapping them back to their parent table.
11pub fn compute_parallel_batches(
12 sorted: &IndexSet<String>,
13 graph: &IndexMap<String, IndexSet<String>>,
14 table_paths: &IndexSet<String>,
15) -> Vec<Vec<String>> {
16 let mut batches: Vec<Vec<String>> = Vec::new();
17 let mut node_to_batch: IndexMap<String, usize> = IndexMap::new();
18
19 for node in sorted {
20 // Find the maximum batch level of all dependencies that are in the graph
21 let deps = graph.get(node);
22
23 let max_dep_batch = if let Some(deps) = deps {
24 deps.iter()
25 .filter_map(|dep| {
26 // Check if dependency is in sorted list directly
27 if sorted.contains(dep) {
28 return node_to_batch.get(dep).copied();
29 }
30
31 // Check if dependency is a table column path (e.g., TABLE/$table/0/column)
32 // If so, map it to the parent table path
33 if dep.contains("/$table/") {
34 for table_path in table_paths {
35 if dep.starts_with(table_path) {
36 // Found the parent table, check its batch
37 return node_to_batch.get(table_path).copied();
38 }
39 }
40 }
41
42 // Check for other table-related paths like $datas, $skip, $clear
43 if dep.contains("/$datas/") || dep.ends_with("/$skip") || dep.ends_with("/$clear") {
44 for table_path in table_paths {
45 if dep.starts_with(table_path) {
46 return node_to_batch.get(table_path).copied();
47 }
48 }
49 }
50
51 // Dependency is external (not in our graph), ignore it
52 None
53 })
54 .max()
55 } else {
56 None
57 };
58
59 // This node goes in the batch after the max dependency batch
60 let batch_idx = max_dep_batch.map(|b| b + 1).unwrap_or(0);
61
62 // Ensure we have enough batches
63 while batches.len() <= batch_idx {
64 batches.push(Vec::new());
65 }
66
67 batches[batch_idx].push(node.clone());
68 node_to_batch.insert(node.clone(), batch_idx);
69 }
70
71 batches
72}
73
74/// Recursively collect all transitive dependencies, excluding tables themselves
75pub fn collect_transitive_deps(
76 deps: &IndexSet<String>,
77 graph: &IndexMap<String, IndexSet<String>>,
78 table_paths: &IndexSet<String>,
79 result: &mut IndexSet<String>,
80) {
81 for dep in deps {
82 // Skip if it's a table (we only want non-table dependencies)
83 if table_paths.contains(dep) {
84 continue;
85 }
86
87 // Add this dependency if not already added
88 if result.insert(dep.clone()) {
89 // Recursively collect its dependencies
90 if let Some(sub_deps) = graph.get(dep) {
91 collect_transitive_deps(sub_deps, graph, table_paths, result);
92 }
93 }
94 }
95}
96