json_eval_rs/topo_sort/
parsed.rs1use indexmap::{IndexMap, IndexSet};
4use crate::ParsedSchema;
5use crate::path_utils;
6use crate::topo_sort::common::{compute_parallel_batches, collect_transitive_deps};
7
8pub fn topological_sort_parsed(parsed: &ParsedSchema) -> Result<Vec<Vec<String>>, String> {
9 let mut sorted = IndexSet::new();
10 let mut visited = IndexSet::new();
11 let mut visiting = IndexSet::new();
12
13 let filtered_evaluations: IndexMap<String, IndexSet<String>> = parsed
15 .evaluations
16 .keys()
17 .filter(|key| {
18 !key.contains("/dependents/")
19 && !key.contains("/rules/")
20 && !key.contains("/options/")
21 && !key.contains("/condition/")
22 && !key.contains("/$layout/")
23 && !key.contains("/config/")
24 && !key.contains("/items/")
25 && !key.ends_with("/options")
26 && !key.ends_with("/value")
27 && (key.starts_with("#/$") && !key.contains("/value/"))
28 })
29 .map(|key| {
30 let deps = parsed.dependencies.get(key).cloned().unwrap_or_default();
31 (key.clone(), deps)
32 })
33 .collect();
34
35 let mut table_groups: IndexMap<String, IndexSet<String>> = IndexMap::new();
37 let mut evaluation_to_table: IndexMap<String, String> = IndexMap::new();
38
39 let mut table_paths: IndexSet<String> = IndexSet::new();
41 for table_key in parsed.tables.keys() {
42 let table_path = table_key.to_string();
43 table_paths.insert(table_path);
44 }
45
46 let mut normalized_to_table: IndexMap<String, String> = IndexMap::new();
48 for tp in &table_paths {
49 if let Some(last_segment) = tp.rsplit('/').next() {
50 normalized_to_table.insert(last_segment.to_string(), tp.clone());
51 }
52 }
53
54 let mut pointer_to_eval: IndexMap<String, String> = IndexMap::new();
56 for eval_key in filtered_evaluations.keys() {
57 let pointer = path_utils::normalize_to_json_pointer(eval_key);
58 pointer_to_eval.insert(pointer, eval_key.clone());
59 }
60
61 for table_path in &table_paths {
62 let pointer = path_utils::normalize_to_json_pointer(table_path);
63 pointer_to_eval.insert(pointer, table_path.clone());
64 }
65
66 for (eval_key, deps) in parsed.evaluations.keys().map(|k| {
68 let deps = parsed.dependencies.get(k).cloned().unwrap_or_default();
69 (k, deps)
70 }) {
71 let table_path_opt = table_paths
72 .iter()
73 .filter(|tp| eval_key.starts_with(tp.as_str()))
74 .max_by_key(|tp| tp.len());
75
76 if let Some(table_path) = table_path_opt {
77 evaluation_to_table.insert(eval_key.clone(), table_path.clone());
78
79 let normalized_deps: IndexSet<String> = deps
80 .iter()
81 .filter_map(|dep| {
82 if dep.starts_with('$') && !dep.contains('.') && !dep.contains('/') {
83 return None;
84 }
85
86 if let Some(eval_key) = pointer_to_eval.get(dep) {
87 return Some(eval_key.clone());
88 }
89
90 for tp in &table_paths {
91 let tp_str = tp.as_str();
92 let tp_with_slash = format!("{}/", tp_str);
93 if tp_str != table_path.as_str() {
94 if dep == tp_str || dep.starts_with(&tp_with_slash) {
95 return Some(tp.clone());
96 }
97 }
98 }
99
100 if let Some(target_table) = normalized_to_table.get(dep) {
101 if target_table != table_path {
102 return Some(target_table.clone());
103 }
104 }
105
106 let table_path_with_slash = format!("{}/", table_path.as_str());
107 if !dep.starts_with(table_path.as_str())
108 && !dep.starts_with(&table_path_with_slash)
109 {
110 Some(dep.clone())
111 } else {
112 None
113 }
114 })
115 .collect();
116
117 table_groups
118 .entry(table_path.clone())
119 .or_insert_with(IndexSet::new)
120 .extend(normalized_deps);
121 }
122 }
123
124 let mut unified_graph: IndexMap<String, IndexSet<String>> = IndexMap::new();
125
126 for (table_path, deps) in &table_groups {
127 let resolved_deps: IndexSet<String> = deps
128 .iter()
129 .filter_map(|dep| {
130 if dep == table_path {
131 return None;
132 }
133 if let Some(eval_key) = pointer_to_eval.get(dep) {
134 if eval_key == table_path {
135 return None;
136 }
137 Some(eval_key.clone())
138 } else {
139 Some(dep.clone())
140 }
141 })
142 .collect();
143 unified_graph.insert(table_path.clone(), resolved_deps);
144 }
145
146 for (eval_key, deps) in &filtered_evaluations {
147 if !evaluation_to_table.contains_key(eval_key) {
148 let mut normalized_deps: IndexSet<String> = IndexSet::new();
149
150 for dep in deps {
151 if let Some(eval_key) = pointer_to_eval.get(dep) {
152 normalized_deps.insert(eval_key.clone());
153 continue;
154 }
155
156 let mut found_table = false;
157 for tp in &table_paths {
158 let tp_str = tp.as_str();
159 let tp_with_slash = format!("{}/", tp_str);
160 if dep == tp_str || dep.starts_with(&tp_with_slash) {
161 normalized_deps.insert(tp.clone());
162 found_table = true;
163 break;
164 }
165 }
166
167 if found_table {
168 continue;
169 }
170
171 let dep_as_pointer = path_utils::normalize_to_json_pointer(dep);
172 let dep_as_eval_prefix = format!("#{}", dep_as_pointer);
173 let has_field_evaluations = parsed.evaluations.keys().any(|k| {
174 k.starts_with(&dep_as_eval_prefix)
175 && k.len() > dep_as_eval_prefix.len()
176 && k[dep_as_eval_prefix.len()..].starts_with('/')
177 });
178
179 if has_field_evaluations {
180 for field_eval_key in parsed.evaluations.keys() {
181 if field_eval_key.starts_with(&dep_as_eval_prefix)
182 && field_eval_key.len() > dep_as_eval_prefix.len()
183 && field_eval_key[dep_as_eval_prefix.len()..].starts_with('/') {
184 normalized_deps.insert(field_eval_key.clone());
185 }
186 }
187 } else {
188 normalized_deps.insert(dep.clone());
189 }
190 }
191
192 unified_graph.insert(eval_key.clone(), normalized_deps);
193 }
194 }
195
196 let mut table_dependencies = IndexSet::new();
197 for table_path in &table_paths {
198 if let Some(deps) = unified_graph.get(table_path) {
199 collect_transitive_deps(
200 deps,
201 &unified_graph,
202 &table_paths,
203 &mut table_dependencies,
204 );
205 }
206 }
207
208 let mut expanded = true;
209 while expanded {
210 expanded = false;
211 let current_deps: Vec<String> = table_dependencies.iter().cloned().collect();
212 for dep in ¤t_deps {
213 if let Some(sub_deps) = unified_graph.get(dep) {
214 for sub_dep in sub_deps {
215 if !table_paths.contains(sub_dep) {
216 if table_dependencies.insert(sub_dep.clone()) {
217 expanded = true;
218 }
219 }
220 }
221 }
222 }
223 }
224
225 let mut phase1_nodes = Vec::new();
226 let mut phase2_nodes = Vec::new();
227 let mut phase3_nodes = Vec::new();
228
229 for node in unified_graph.keys() {
230 if table_paths.contains(node) {
231 phase2_nodes.push(node.clone());
232 } else if table_dependencies.contains(node) {
233 phase1_nodes.push(node.clone());
234 } else {
235 phase3_nodes.push(node.clone());
236 }
237 }
238
239 let sort_by_deps = |a: &String, b: &String| {
240 let a_deps = unified_graph.get(a).map(|d| d.len()).unwrap_or(0);
241 let b_deps = unified_graph.get(b).map(|d| d.len()).unwrap_or(0);
242 a_deps.cmp(&b_deps).then_with(|| a.cmp(b))
243 };
244
245 phase1_nodes.sort_by(sort_by_deps);
246 phase3_nodes.sort_by(sort_by_deps);
247
248 for node in &phase1_nodes {
249 if !visited.contains(node) {
250 let deps = unified_graph.get(node).cloned().unwrap_or_default();
251 visit_node_parsed(parsed, node, &deps, &unified_graph, &mut visited, &mut visiting, &mut sorted)?;
252 }
253 }
254
255 phase2_nodes.sort_by(|a, b| {
256 let a_deps = unified_graph.get(a).map(|d| d.len()).unwrap_or(0);
257 let b_deps = unified_graph.get(b).map(|d| d.len()).unwrap_or(0);
258
259 let a_deps_on_b = unified_graph.get(a)
260 .map(|deps| deps.contains(b))
261 .unwrap_or(false);
262 let b_deps_on_a = unified_graph.get(b)
263 .map(|deps| deps.contains(a))
264 .unwrap_or(false);
265
266 if a_deps_on_b {
267 std::cmp::Ordering::Greater
268 } else if b_deps_on_a {
269 std::cmp::Ordering::Less
270 } else {
271 a_deps.cmp(&b_deps).then_with(|| a.cmp(b))
272 }
273 });
274
275 for node in &phase2_nodes {
276 if !visited.contains(node) {
277 let deps = unified_graph.get(node).cloned().unwrap_or_default();
278 visit_node_parsed(parsed, node, &deps, &unified_graph, &mut visited, &mut visiting, &mut sorted)?;
279 }
280 }
281
282 for node in &phase3_nodes {
283 if !visited.contains(node) {
284 let deps = unified_graph.get(node).cloned().unwrap_or_default();
285 visit_node_parsed(parsed, node, &deps, &unified_graph, &mut visited, &mut visiting, &mut sorted)?;
286 }
287 }
288
289 let batches = compute_parallel_batches(&sorted, &unified_graph, &table_paths);
290
291 Ok(batches)
292}
293
294fn visit_node_parsed(
295 _parsed: &ParsedSchema,
296 node: &str,
297 deps: &IndexSet<String>,
298 graph: &IndexMap<String, IndexSet<String>>,
299 visited: &mut IndexSet<String>,
300 visiting: &mut IndexSet<String>,
301 sorted: &mut IndexSet<String>,
302) -> Result<(), String> {
303 if visiting.contains(node) {
304 return Err(format!("Circular dependency detected involving: {}", node));
305 }
306
307 if visited.contains(node) {
308 return Ok(());
309 }
310
311 visiting.insert(node.to_string());
312
313 for dep in deps {
314 if let Some(dep_deps) = graph.get(dep) {
315 visit_node_parsed(_parsed, dep, dep_deps, graph, visited, visiting, sorted)?;
316 }
317 }
318
319 visiting.swap_remove(node);
320 visited.insert(node.to_string());
321 sorted.insert(node.to_string());
322
323 Ok(())
324}