1use std::collections::{BTreeSet, HashMap};
15
16use flowscope_core::{AnalyzeResult, Edge, EdgeType, Node, NodeType, StatementMeta};
17use serde::Serialize;
18
19const MAX_TRAVERSAL_NODES: usize = 10_000;
24
25struct StatementView<'a> {
28 statement_index: usize,
29 statement_type: &'a str,
30 nodes: Vec<&'a Node>,
31 edges: Vec<&'a Edge>,
32}
33
34pub fn export_dali_compat(result: &AnalyzeResult, sql: &str) -> Result<String, serde_json::Error> {
42 let output = build_dali_output(result, sql);
43 serde_json::to_string_pretty(&output)
44}
45
46pub fn export_dali_compat_compact(
48 result: &AnalyzeResult,
49 sql: &str,
50) -> Result<String, serde_json::Error> {
51 let output = build_dali_output(result, sql);
52 serde_json::to_string(&output)
53}
54
55#[derive(Debug, Serialize, serde::Deserialize)]
58pub struct DaliOutput {
59 pub package: String,
60 pub transforms: Vec<DaliTransform>,
61 pub table_lineage: Vec<DaliTableLineage>,
62}
63
64#[derive(Debug, Serialize, serde::Deserialize)]
65pub struct DaliTransform {
66 pub name: String,
67 #[serde(rename = "targetTables")]
68 pub target_tables: Vec<String>,
69 pub query: String,
70 pub is_union: bool,
71 pub refs: Vec<DaliRef>,
72 pub source_tables: Vec<String>,
73}
74
75#[derive(Debug, Serialize, serde::Deserialize)]
76pub struct DaliRef {
77 pub target_column: String,
78 pub source_columns: Vec<DaliSourceColumn>,
79}
80
81#[derive(Debug, Serialize, serde::Deserialize)]
82pub struct DaliSourceColumn {
83 pub expression: String,
84 pub columns: Vec<String>,
85}
86
87#[derive(Debug, Serialize, serde::Deserialize)]
88pub struct DaliTableLineage {
89 pub transform: String,
90 pub target_tables: Vec<String>,
91 pub source_tables: Vec<String>,
92 pub relation: String,
93}
94
95fn build_dali_output(result: &AnalyzeResult, sql: &str) -> DaliOutput {
98 let mut transforms = Vec::new();
99 let mut table_lineage = Vec::new();
100
101 for stmt in &result.statements {
102 let view = statement_view(result, stmt);
103 let written_tables = collect_written_tables(&view);
104 if written_tables.is_empty() {
105 continue;
106 }
107
108 let source_tables = collect_source_tables(&view, &written_tables);
109 let refs = build_refs(&view);
110 let relation = map_relation(view.statement_type);
111 let name = build_transform_name(&view);
112
113 transforms.push(DaliTransform {
114 name: name.clone(),
115 target_tables: written_tables.to_vec(),
116 query: String::new(), is_union: false,
118 refs,
119 source_tables: source_tables.iter().cloned().collect(),
120 });
121
122 table_lineage.push(DaliTableLineage {
123 transform: name,
124 target_tables: written_tables.into_iter().collect(),
125 source_tables: source_tables.into_iter().collect(),
126 relation,
127 });
128 }
129
130 DaliOutput {
131 package: sql.to_string(),
132 transforms,
133 table_lineage,
134 }
135}
136
137fn statement_view<'a>(result: &'a AnalyzeResult, stmt: &'a StatementMeta) -> StatementView<'a> {
139 StatementView {
140 statement_index: stmt.statement_index,
141 statement_type: stmt.statement_type.as_str(),
142 nodes: result.nodes_in_statement(stmt.statement_index).collect(),
143 edges: result.edges_in_statement(stmt.statement_index).collect(),
144 }
145}
146
147fn collect_written_tables(stmt: &StatementView<'_>) -> Vec<String> {
160 let table_nodes: Vec<&Node> = stmt
163 .nodes
164 .iter()
165 .copied()
166 .filter(|n| n.node_type.is_table_or_view())
167 .collect();
168
169 let mut written = BTreeSet::new();
170 written.extend(tables_with_incoming_dataflow(stmt, &table_nodes));
171 written.extend(tables_with_cross_owner_column_dataflow(stmt, &table_nodes));
172
173 if written.is_empty() {
174 if let Some(name) = dml_ownership_only_target(stmt, &table_nodes) {
175 written.insert(name);
176 }
177 }
178
179 written.into_iter().collect()
180}
181
182fn tables_with_incoming_dataflow(
185 stmt: &StatementView<'_>,
186 table_nodes: &[&Node],
187) -> BTreeSet<String> {
188 let mut written = BTreeSet::new();
189 for node in table_nodes {
190 let is_target = stmt
191 .edges
192 .iter()
193 .any(|edge| edge.to == node.id && edge.edge_type == EdgeType::DataFlow);
194 if is_target {
195 written.insert(relation_display_name(node));
196 }
197 }
198 written
199}
200
201fn tables_with_cross_owner_column_dataflow(
205 stmt: &StatementView<'_>,
206 table_nodes: &[&Node],
207) -> BTreeSet<String> {
208 let mut col_owner: HashMap<&str, &str> = HashMap::new();
209 for edge in &stmt.edges {
210 if edge.edge_type == EdgeType::Ownership && table_nodes.iter().any(|t| t.id == edge.from) {
211 col_owner.insert(edge.to.as_ref(), edge.from.as_ref());
212 }
213 }
214
215 let column_ids: BTreeSet<&str> = stmt
216 .nodes
217 .iter()
218 .copied()
219 .filter(|n| n.node_type == NodeType::Column)
220 .map(|n| n.id.as_ref())
221 .collect();
222
223 let mut written = BTreeSet::new();
224 for edge in &stmt.edges {
225 if edge.edge_type != EdgeType::DataFlow {
226 continue;
227 }
228 let from_is_col = column_ids.contains(edge.from.as_ref());
229 let to_is_col = column_ids.contains(edge.to.as_ref());
230 if !(from_is_col && to_is_col) {
231 continue;
232 }
233 let from_owner = col_owner.get(edge.from.as_ref());
234 let to_owner = col_owner.get(edge.to.as_ref());
235 if let (Some(&from_tbl), Some(&to_tbl)) = (from_owner, to_owner) {
236 if from_tbl != to_tbl {
237 if let Some(tbl_node) = table_nodes.iter().find(|t| t.id.as_ref() == to_tbl) {
238 written.insert(relation_display_name(tbl_node));
239 }
240 }
241 }
242 }
243 written
244}
245
246fn dml_ownership_only_target(stmt: &StatementView<'_>, table_nodes: &[&Node]) -> Option<String> {
251 let stmt_upper = stmt.statement_type.to_uppercase();
252 if !matches!(stmt_upper.as_str(), "MERGE" | "UPDATE" | "DELETE") {
253 return None;
254 }
255 for node in table_nodes {
256 let has_ownership = stmt
257 .edges
258 .iter()
259 .any(|e| e.from == node.id && e.edge_type == EdgeType::Ownership);
260 let has_dataflow = stmt
261 .edges
262 .iter()
263 .any(|e| (e.from == node.id || e.to == node.id) && e.edge_type == EdgeType::DataFlow);
264 if has_ownership && !has_dataflow {
265 return Some(relation_display_name(node));
266 }
267 }
268 None
269}
270
271fn relation_display_name(node: &Node) -> String {
274 node.qualified_name
275 .as_deref()
276 .unwrap_or(&node.label)
277 .to_string()
278}
279
280fn collect_source_tables(stmt: &StatementView<'_>, written: &[String]) -> BTreeSet<String> {
282 let written_set: BTreeSet<&str> = written.iter().map(|s| s.as_str()).collect();
283 let mut sources = BTreeSet::new();
284
285 for node in stmt.nodes.iter().copied() {
286 if !matches!(node.node_type, NodeType::Table | NodeType::View) {
287 continue;
288 }
289 let name = node
290 .qualified_name
291 .as_deref()
292 .unwrap_or(&node.label)
293 .to_string();
294 if written_set.contains(name.as_str()) {
295 continue;
296 }
297 let is_source = stmt
300 .edges
301 .iter()
302 .any(|edge| edge.from == node.id && edge.edge_type == EdgeType::DataFlow);
303 let is_not_written = !stmt
304 .edges
305 .iter()
306 .any(|edge| edge.to == node.id && edge.edge_type == EdgeType::DataFlow);
307 if is_source || is_not_written {
308 sources.insert(name);
309 }
310 }
311 sources
312}
313
314fn build_refs(stmt: &StatementView<'_>) -> Vec<DaliRef> {
320 let column_nodes: Vec<&Node> = stmt
321 .nodes
322 .iter()
323 .copied()
324 .filter(|n| n.node_type == NodeType::Column)
325 .collect();
326
327 let relation_nodes: Vec<&Node> = stmt
328 .nodes
329 .iter()
330 .copied()
331 .filter(|n| n.node_type.is_table_like() || n.node_type == NodeType::Output)
332 .collect();
333
334 let mut column_owner: HashMap<&str, (&str, bool)> = HashMap::new(); for edge in &stmt.edges {
337 if edge.edge_type == EdgeType::Ownership {
338 if let Some(rel) = relation_nodes.iter().find(|n| n.id == edge.from) {
339 let name = rel.qualified_name.as_deref().unwrap_or(&rel.label);
340 let is_source = matches!(rel.node_type, NodeType::Table | NodeType::View)
341 && !stmt
342 .edges
343 .iter()
344 .any(|e| e.to == rel.id && e.edge_type == EdgeType::DataFlow);
345 column_owner.insert(edge.to.as_ref(), (name, is_source));
346 }
347 }
348 }
349
350 let target_table_ids: BTreeSet<&str> = stmt
352 .edges
353 .iter()
354 .filter(|e| e.edge_type == EdgeType::DataFlow)
355 .filter(|e| {
356 relation_nodes
357 .iter()
358 .any(|n| n.id == e.to && n.node_type.is_table_like())
359 })
360 .map(|e| e.to.as_ref())
361 .collect();
362
363 let target_column_ids: BTreeSet<&str> = stmt
365 .edges
366 .iter()
367 .filter(|e| {
368 e.edge_type == EdgeType::Ownership && target_table_ids.contains(e.from.as_ref())
369 })
370 .map(|e| e.to.as_ref())
371 .collect();
372
373 let mut incoming: HashMap<&str, Vec<(&str, Option<&str>)>> = HashMap::new();
379 for edge in &stmt.edges {
380 if matches!(edge.edge_type, EdgeType::Derivation | EdgeType::DataFlow) {
381 let from_is_col = column_nodes.iter().any(|c| c.id == edge.from);
382 let to_is_col = column_nodes.iter().any(|c| c.id == edge.to);
383 if from_is_col && to_is_col {
384 incoming
385 .entry(edge.to.as_ref())
386 .or_default()
387 .push((edge.from.as_ref(), edge.expression.as_deref()));
388 }
389 }
390 }
391
392 let mut refs: Vec<DaliRef> = Vec::new();
394 let mut seen_labels = BTreeSet::new();
395
396 for col in &column_nodes {
397 if !target_column_ids.contains(col.id.as_ref()) {
398 continue;
399 }
400 if !seen_labels.insert(col.label.to_string()) {
401 continue;
402 }
403
404 let mut sources: Vec<(String, String, Option<String>)> = Vec::new();
407 let mut visited = BTreeSet::new();
408 let mut stack: Vec<&str> = vec![col.id.as_ref()];
409
410 while let Some(current) = stack.pop() {
411 if !visited.insert(current) {
412 continue;
413 }
414 if visited.len() > MAX_TRAVERSAL_NODES {
418 break;
419 }
420 if let Some(predecessors) = incoming.get(current) {
421 for &(pred, edge_expr) in predecessors {
422 if let Some(&(table_name, is_source)) = column_owner.get(pred) {
423 if is_source {
424 if let Some(pred_node) =
425 column_nodes.iter().find(|c| c.id.as_ref() == pred)
426 {
427 sources.push((
428 table_name.to_string(),
429 pred_node.label.to_string(),
430 edge_expr.map(str::to_string),
431 ));
432 }
433 } else {
434 stack.push(pred);
436 }
437 } else {
438 stack.push(pred);
440 }
441 }
442 }
443 }
444
445 if sources.is_empty() {
446 continue;
447 }
448
449 let mut source_columns = Vec::new();
452 let mut seen_src = BTreeSet::new();
453 for (table, column, expression) in &sources {
454 let qualified = format!("{table}.{column}");
455 if seen_src.insert(qualified.clone()) {
456 source_columns.push(DaliSourceColumn {
457 expression: expression.clone().unwrap_or_else(|| column.clone()),
458 columns: vec![qualified],
459 });
460 }
461 }
462
463 refs.push(DaliRef {
464 target_column: col.label.to_string(),
465 source_columns,
466 });
467 }
468
469 refs
470}
471
472fn map_relation(statement_type: &str) -> String {
474 match statement_type.to_uppercase().as_str() {
475 "INSERT" => "INSERT_SELECT".to_string(),
476 "CREATE VIEW" | "CREATE_VIEW" => "VIEW_SELECT".to_string(),
477 "MERGE" => "MERGE".to_string(),
478 "UPDATE" => "UPDATE".to_string(),
479 "DELETE" => "DELETE".to_string(),
480 "CREATE TABLE" | "CREATE_TABLE" | "CREATE_TABLE_AS" | "CREATE TABLE AS" => {
481 "TABLE_SELECT".to_string()
482 }
483 other => other.to_string(),
484 }
485}
486
487fn build_transform_name(stmt: &StatementView<'_>) -> String {
489 let stmt_type = stmt.statement_type.to_uppercase();
490 format!("{}:{}", stmt_type, stmt.statement_index)
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496 use flowscope_core::{analyze, AnalyzeRequest, Dialect};
497
498 fn analyze_oracle(sql: &str) -> AnalyzeResult {
499 let request = AnalyzeRequest {
500 sql: sql.to_string(),
501 files: None,
502 dialect: Dialect::Oracle,
503 source_name: None,
504 options: None,
505 schema: None,
506 #[cfg(feature = "templating")]
507 template_config: None,
508 };
509 analyze(&request)
510 }
511
512 #[test]
513 fn insert_select_produces_transform() {
514 let sql = "INSERT INTO target_table (col1, col2) SELECT a, b FROM source_table";
515 let result = analyze_oracle(sql);
516 let output = build_dali_output(&result, sql);
517
518 assert_eq!(output.transforms.len(), 1);
519 assert_eq!(output.table_lineage.len(), 1);
520
521 let t = &output.transforms[0];
522 assert_eq!(t.target_tables, vec!["TARGET_TABLE"]);
523 assert!(t.source_tables.contains(&"SOURCE_TABLE".to_string()));
524 assert_eq!(output.table_lineage[0].relation, "INSERT_SELECT");
525 }
526
527 #[test]
528 fn create_view_produces_view_select_relation() {
529 let sql = "CREATE VIEW my_view AS SELECT id, name FROM base_table";
530 let result = analyze_oracle(sql);
531 let output = build_dali_output(&result, sql);
532
533 assert_eq!(output.transforms.len(), 1);
534 let t = &output.transforms[0];
535 assert_eq!(t.target_tables, vec!["MY_VIEW"]);
536 assert!(t.source_tables.contains(&"BASE_TABLE".to_string()));
537 assert_eq!(output.table_lineage[0].relation, "VIEW_SELECT");
538 }
539
540 #[test]
541 fn refs_contain_column_level_mappings() {
542 let sql = "INSERT INTO tgt (x, y) SELECT a, b FROM src";
543 let result = analyze_oracle(sql);
544 let output = build_dali_output(&result, sql);
545
546 let t = &output.transforms[0];
547 assert!(!t.refs.is_empty(), "refs should not be empty");
548
549 let ref_targets: Vec<&str> = t.refs.iter().map(|r| r.target_column.as_str()).collect();
551 assert_eq!(
552 ref_targets.len(),
553 2,
554 "should have 2 refs, got {ref_targets:?}"
555 );
556
557 for r in &t.refs {
559 for sc in &r.source_columns {
560 assert!(
561 sc.columns
562 .iter()
563 .any(|c| c.to_uppercase().starts_with("SRC.")),
564 "source column should reference SRC table, got {:?}",
565 sc.columns
566 );
567 }
568 }
569 }
570
571 #[test]
572 fn update_produces_correct_relation() {
573 let sql = "UPDATE target_table SET col1 = src.val FROM source_table src WHERE target_table.id = src.id";
574 let result = analyze_oracle(sql);
575 let output = build_dali_output(&result, sql);
576
577 if !output.table_lineage.is_empty() {
578 assert_eq!(output.table_lineage[0].relation, "UPDATE");
579 }
580 }
581
582 #[test]
583 fn merge_produces_correct_relation() {
584 let sql = "MERGE INTO tgt USING src ON (tgt.id = src.id) WHEN MATCHED THEN UPDATE SET tgt.val = src.val WHEN NOT MATCHED THEN INSERT (id, val) VALUES (src.id, src.val)";
585 let result = analyze_oracle(sql);
586 let output = build_dali_output(&result, sql);
587
588 assert!(!output.transforms.is_empty());
589 assert_eq!(output.table_lineage[0].relation, "MERGE");
590 }
591
592 #[test]
593 fn delete_produces_correct_relation() {
594 let sql = "DELETE FROM target_table WHERE id IN (SELECT id FROM src)";
595 let result = analyze_oracle(sql);
596 let output = build_dali_output(&result, sql);
597
598 if !output.table_lineage.is_empty() {
599 assert_eq!(output.table_lineage[0].relation, "DELETE");
600 }
601 }
602
603 #[test]
604 fn standalone_select_produces_no_transform() {
605 let sql = "SELECT a, b FROM some_table";
606 let result = analyze_oracle(sql);
607 let output = build_dali_output(&result, sql);
608
609 assert!(output.transforms.is_empty());
611 assert!(output.table_lineage.is_empty());
612 }
613
614 #[test]
615 fn package_contains_original_sql() {
616 let sql = "INSERT INTO t (c) SELECT c FROM s";
617 let result = analyze_oracle(sql);
618 let output = build_dali_output(&result, sql);
619
620 assert_eq!(output.package, sql);
621 }
622
623 #[test]
624 fn output_is_valid_json() {
625 let sql = "INSERT INTO t (c) SELECT c FROM s";
626 let result = analyze_oracle(sql);
627 let json_str = export_dali_compat(&result, sql).expect("export should succeed");
628
629 let parsed: serde_json::Value =
630 serde_json::from_str(&json_str).expect("output should be valid JSON");
631 assert!(parsed.get("package").is_some());
632 assert!(parsed.get("transforms").is_some());
633 assert!(parsed.get("table_lineage").is_some());
634 }
635
636 #[test]
637 fn compact_output_has_no_newlines_in_values() {
638 let sql = "INSERT INTO t (c) SELECT c FROM s";
639 let result = analyze_oracle(sql);
640 let json_str = export_dali_compat_compact(&result, sql).expect("export should succeed");
641
642 assert!(
644 !json_str.contains("\n "),
645 "compact output should not be indented"
646 );
647 }
648
649 #[test]
650 fn source_columns_have_expression_and_columns() {
651 let sql = "INSERT INTO tgt (x) SELECT a FROM src";
652 let result = analyze_oracle(sql);
653 let output = build_dali_output(&result, sql);
654
655 for t in &output.transforms {
656 for r in &t.refs {
657 for sc in &r.source_columns {
658 assert!(!sc.expression.is_empty(), "expression should not be empty");
659 assert!(!sc.columns.is_empty(), "columns should not be empty");
660 }
661 }
662 }
663 }
664}