json_eval_rs/topo_sort/
parsed.rs

1/// Topological sorting for ParsedSchema
2
3use indexmap::{IndexMap, IndexSet};
4use crate::ParsedSchema;
5use crate::path_utils;
6use crate::topo_sort::common::{compute_parallel_batches, collect_transitive_deps};
7
8pub fn topological_sort_parsed(parsed: &ParsedSchema) -> Result<Vec<Vec<String>>, String> {
9    let mut sorted = IndexSet::new();
10    let mut visited = IndexSet::new();
11    let mut visiting = IndexSet::new();
12
13    // Filter evaluations to exclude layout, rules, config, dependents, options, condition, value
14    let filtered_evaluations: IndexMap<String, IndexSet<String>> = parsed
15        .evaluations
16        .keys()
17        .filter(|key| {
18            !key.contains("/dependents/")
19                && !key.contains("/rules/")
20                && !key.contains("/options/")
21                && !key.contains("/condition/")
22                && !key.contains("/$layout/")
23                && !key.contains("/config/")
24                && !key.contains("/items/")
25                && !key.ends_with("/options")
26                && !key.ends_with("/value")
27                && (key.starts_with("#/$") && !key.contains("/value/"))
28        })
29        .map(|key| {
30            let deps = parsed.dependencies.get(key).cloned().unwrap_or_default();
31            (key.clone(), deps)
32        })
33        .collect();
34
35    // Group table evaluations and merge dependencies
36    let mut table_groups: IndexMap<String, IndexSet<String>> = IndexMap::new();
37    let mut evaluation_to_table: IndexMap<String, String> = IndexMap::new();
38
39    // First pass: identify all table paths from $table keys
40    let mut table_paths: IndexSet<String> = IndexSet::new();
41    for table_key in parsed.tables.keys() {
42        let table_path = table_key.to_string();
43        table_paths.insert(table_path);
44    }
45
46    // Create a mapping of normalized names to table paths
47    let mut normalized_to_table: IndexMap<String, String> = IndexMap::new();
48    for tp in &table_paths {
49        if let Some(last_segment) = tp.rsplit('/').next() {
50            normalized_to_table.insert(last_segment.to_string(), tp.clone());
51        }
52    }
53
54    // Create a mapping from JSON pointer paths to evaluation keys
55    let mut pointer_to_eval: IndexMap<String, String> = IndexMap::new();
56    for eval_key in filtered_evaluations.keys() {
57        let pointer = path_utils::normalize_to_json_pointer(eval_key);
58        pointer_to_eval.insert(pointer, eval_key.clone());
59    }
60    
61    for table_path in &table_paths {
62        let pointer = path_utils::normalize_to_json_pointer(table_path);
63        pointer_to_eval.insert(pointer, table_path.clone());
64    }
65
66    // Second pass: group evaluations by table and merge dependencies
67    for (eval_key, deps) in parsed.evaluations.keys().map(|k| {
68        let deps = parsed.dependencies.get(k).cloned().unwrap_or_default();
69        (k, deps)
70    }) {
71        let table_path_opt = table_paths
72            .iter()
73            .filter(|tp| eval_key.starts_with(tp.as_str()))
74            .max_by_key(|tp| tp.len());
75
76        if let Some(table_path) = table_path_opt {
77            evaluation_to_table.insert(eval_key.clone(), table_path.clone());
78
79            let normalized_deps: IndexSet<String> = deps
80                .iter()
81                .filter_map(|dep| {
82                    if dep.starts_with('$') && !dep.contains('.') && !dep.contains('/') {
83                        return None;
84                    }
85
86                    if let Some(eval_key) = pointer_to_eval.get(dep) {
87                        return Some(eval_key.clone());
88                    }
89
90                    for tp in &table_paths {
91                        let tp_str = tp.as_str();
92                        let tp_with_slash = format!("{}/", tp_str);
93                        if tp_str != table_path.as_str() {
94                            if dep == tp_str || dep.starts_with(&tp_with_slash) {
95                                return Some(tp.clone());
96                            }
97                        }
98                    }
99
100                    if let Some(target_table) = normalized_to_table.get(dep) {
101                        if target_table != table_path {
102                            return Some(target_table.clone());
103                        }
104                    }
105
106                    let table_path_with_slash = format!("{}/", table_path.as_str());
107                    if !dep.starts_with(table_path.as_str())
108                        && !dep.starts_with(&table_path_with_slash)
109                    {
110                        Some(dep.clone())
111                    } else {
112                        None
113                    }
114                })
115                .collect();
116
117            table_groups
118                .entry(table_path.clone())
119                .or_insert_with(IndexSet::new)
120                .extend(normalized_deps);
121        }
122    }
123
124    let mut unified_graph: IndexMap<String, IndexSet<String>> = IndexMap::new();
125
126    for (table_path, deps) in &table_groups {
127        let resolved_deps: IndexSet<String> = deps
128            .iter()
129            .filter_map(|dep| {
130                if dep == table_path {
131                    return None;
132                }
133                if let Some(eval_key) = pointer_to_eval.get(dep) {
134                    if eval_key == table_path {
135                        return None;
136                    }
137                    Some(eval_key.clone())
138                } else {
139                    Some(dep.clone())
140                }
141            })
142            .collect();
143        unified_graph.insert(table_path.clone(), resolved_deps);
144    }
145
146    for (eval_key, deps) in &filtered_evaluations {
147        if !evaluation_to_table.contains_key(eval_key) {
148            let mut normalized_deps: IndexSet<String> = IndexSet::new();
149            
150            for dep in deps {
151                if let Some(eval_key) = pointer_to_eval.get(dep) {
152                    normalized_deps.insert(eval_key.clone());
153                    continue;
154                }
155
156                let mut found_table = false;
157                for tp in &table_paths {
158                    let tp_str = tp.as_str();
159                    let tp_with_slash = format!("{}/", tp_str);
160                    if dep == tp_str || dep.starts_with(&tp_with_slash) {
161                        normalized_deps.insert(tp.clone());
162                        found_table = true;
163                        break;
164                    }
165                }
166                
167                if found_table {
168                    continue;
169                }
170                
171                let dep_as_pointer = path_utils::normalize_to_json_pointer(dep);
172                let dep_as_eval_prefix = format!("#{}", dep_as_pointer);
173                let has_field_evaluations = parsed.evaluations.keys().any(|k| {
174                    k.starts_with(&dep_as_eval_prefix) 
175                    && k.len() > dep_as_eval_prefix.len()
176                    && k[dep_as_eval_prefix.len()..].starts_with('/')
177                });
178                
179                if has_field_evaluations {
180                    for field_eval_key in parsed.evaluations.keys() {
181                        if field_eval_key.starts_with(&dep_as_eval_prefix) 
182                            && field_eval_key.len() > dep_as_eval_prefix.len()
183                            && field_eval_key[dep_as_eval_prefix.len()..].starts_with('/') {
184                            normalized_deps.insert(field_eval_key.clone());
185                        }
186                    }
187                } else {
188                    normalized_deps.insert(dep.clone());
189                }
190            }
191
192            unified_graph.insert(eval_key.clone(), normalized_deps);
193        }
194    }
195
196    let mut table_dependencies = IndexSet::new();
197    for table_path in &table_paths {
198        if let Some(deps) = unified_graph.get(table_path) {
199            collect_transitive_deps(
200                deps,
201                &unified_graph,
202                &table_paths,
203                &mut table_dependencies,
204            );
205        }
206    }
207    
208    let mut expanded = true;
209    while expanded {
210        expanded = false;
211        let current_deps: Vec<String> = table_dependencies.iter().cloned().collect();
212        for dep in &current_deps {
213            if let Some(sub_deps) = unified_graph.get(dep) {
214                for sub_dep in sub_deps {
215                    if !table_paths.contains(sub_dep) {
216                        if table_dependencies.insert(sub_dep.clone()) {
217                            expanded = true;
218                        }
219                    }
220                }
221            }
222        }
223    }
224    
225    let mut phase1_nodes = Vec::new();
226    let mut phase2_nodes = Vec::new();
227    let mut phase3_nodes = Vec::new();
228    
229    for node in unified_graph.keys() {
230        if table_paths.contains(node) {
231            phase2_nodes.push(node.clone());
232        } else if table_dependencies.contains(node) {
233            phase1_nodes.push(node.clone());
234        } else {
235            phase3_nodes.push(node.clone());
236        }
237    }
238    
239    let sort_by_deps = |a: &String, b: &String| {
240        let a_deps = unified_graph.get(a).map(|d| d.len()).unwrap_or(0);
241        let b_deps = unified_graph.get(b).map(|d| d.len()).unwrap_or(0);
242        a_deps.cmp(&b_deps).then_with(|| a.cmp(b))
243    };
244    
245    phase1_nodes.sort_by(sort_by_deps);
246    phase3_nodes.sort_by(sort_by_deps);
247    
248    for node in &phase1_nodes {
249        if !visited.contains(node) {
250            let deps = unified_graph.get(node).cloned().unwrap_or_default();
251            visit_node_parsed(parsed, node, &deps, &unified_graph, &mut visited, &mut visiting, &mut sorted)?;
252        }
253    }
254    
255    phase2_nodes.sort_by(|a, b| {
256        let a_deps = unified_graph.get(a).map(|d| d.len()).unwrap_or(0);
257        let b_deps = unified_graph.get(b).map(|d| d.len()).unwrap_or(0);
258        
259        let a_deps_on_b = unified_graph.get(a)
260            .map(|deps| deps.contains(b))
261            .unwrap_or(false);
262        let b_deps_on_a = unified_graph.get(b)
263            .map(|deps| deps.contains(a))
264            .unwrap_or(false);
265        
266        if a_deps_on_b {
267            std::cmp::Ordering::Greater
268        } else if b_deps_on_a {
269            std::cmp::Ordering::Less
270        } else {
271            a_deps.cmp(&b_deps).then_with(|| a.cmp(b))
272        }
273    });
274    
275    for node in &phase2_nodes {
276        if !visited.contains(node) {
277            let deps = unified_graph.get(node).cloned().unwrap_or_default();
278            visit_node_parsed(parsed, node, &deps, &unified_graph, &mut visited, &mut visiting, &mut sorted)?;
279        }
280    }
281    
282    for node in &phase3_nodes {
283        if !visited.contains(node) {
284            let deps = unified_graph.get(node).cloned().unwrap_or_default();
285            visit_node_parsed(parsed, node, &deps, &unified_graph, &mut visited, &mut visiting, &mut sorted)?;
286        }
287    }
288
289    let batches = compute_parallel_batches(&sorted, &unified_graph, &table_paths);
290    
291    Ok(batches)
292}
293
294fn visit_node_parsed(
295    _parsed: &ParsedSchema,
296    node: &str,
297    deps: &IndexSet<String>,
298    graph: &IndexMap<String, IndexSet<String>>,
299    visited: &mut IndexSet<String>,
300    visiting: &mut IndexSet<String>,
301    sorted: &mut IndexSet<String>,
302) -> Result<(), String> {
303    if visiting.contains(node) {
304        return Err(format!("Circular dependency detected involving: {}", node));
305    }
306
307    if visited.contains(node) {
308        return Ok(());
309    }
310
311    visiting.insert(node.to_string());
312
313    for dep in deps {
314        if let Some(dep_deps) = graph.get(dep) {
315            visit_node_parsed(_parsed, dep, dep_deps, graph, visited, visiting, sorted)?;
316        }
317    }
318
319    visiting.swap_remove(node);
320    visited.insert(node.to_string());
321    sorted.insert(node.to_string());
322
323    Ok(())
324}