json_eval_rs/topo_sort/
common.rs

1/// Shared utilities for topological sorting
2
3use indexmap::{IndexMap, IndexSet};
4
5/// Compute parallel execution batches from sorted dependencies
6/// 
7/// Groups evaluations into batches where items in the same batch can be evaluated in parallel.
8/// Each batch depends only on items from previous batches.
9/// 
10/// Handles table column dependencies by mapping them back to their parent table.
11pub fn compute_parallel_batches(
12    sorted: &IndexSet<String>,
13    graph: &IndexMap<String, IndexSet<String>>,
14    table_paths: &IndexSet<String>,
15) -> Vec<Vec<String>> {
16    let mut batches: Vec<Vec<String>> = Vec::new();
17    let mut node_to_batch: IndexMap<String, usize> = IndexMap::new();
18    
19    for node in sorted {
20        // Find the maximum batch level of all dependencies that are in the graph
21        let deps = graph.get(node);
22        
23        let max_dep_batch = if let Some(deps) = deps {
24            deps.iter()
25                .filter_map(|dep| {
26                    // Check if dependency is in sorted list directly
27                    if sorted.contains(dep) {
28                        return node_to_batch.get(dep).copied();
29                    }
30                    
31                    // Check if dependency is a table column path (e.g., TABLE/$table/0/column)
32                    // If so, map it to the parent table path
33                    if dep.contains("/$table/") {
34                        for table_path in table_paths {
35                            if dep.starts_with(table_path) {
36                                // Found the parent table, check its batch
37                                return node_to_batch.get(table_path).copied();
38                            }
39                        }
40                    }
41                    
42                    // Check for other table-related paths like $datas, $skip, $clear
43                    if dep.contains("/$datas/") || dep.ends_with("/$skip") || dep.ends_with("/$clear") {
44                        for table_path in table_paths {
45                            if dep.starts_with(table_path) {
46                                return node_to_batch.get(table_path).copied();
47                            }
48                        }
49                    }
50                    
51                    // Dependency is external (not in our graph), ignore it
52                    None
53                })
54                .max()
55        } else {
56            None
57        };
58        
59        // This node goes in the batch after the max dependency batch
60        let batch_idx = max_dep_batch.map(|b| b + 1).unwrap_or(0);
61        
62        // Ensure we have enough batches
63        while batches.len() <= batch_idx {
64            batches.push(Vec::new());
65        }
66        
67        batches[batch_idx].push(node.clone());
68        node_to_batch.insert(node.clone(), batch_idx);
69    }
70    
71    batches
72}
73
74/// Recursively collect all transitive dependencies, excluding tables themselves
75pub fn collect_transitive_deps(
76    deps: &IndexSet<String>,
77    graph: &IndexMap<String, IndexSet<String>>,
78    table_paths: &IndexSet<String>,
79    result: &mut IndexSet<String>,
80) {
81    for dep in deps {
82        // Skip if it's a table (we only want non-table dependencies)
83        if table_paths.contains(dep) {
84            continue;
85        }
86        
87        // Add this dependency if not already added
88        if result.insert(dep.clone()) {
89            // Recursively collect its dependencies
90            if let Some(sub_deps) = graph.get(dep) {
91                collect_transitive_deps(sub_deps, graph, table_paths, result);
92            }
93        }
94    }
95}
96