Skip to main content

flowscope_export/
dali_compat.rs

1//! Dali (sql-parser-service) compatibility adapter.
2//!
3//! Maps FlowScope's `AnalyzeResult` to the JSON contract produced by the
4//! sql-parser-service lineage extractor:
5//!
6//! ```json
7//! {
8//!   "package": "<sql>",
9//!   "transforms": [ { "name", "targetTables", "query", "is_union", "refs", "source_tables" } ],
10//!   "table_lineage": [ { "transform", "target_tables", "source_tables", "relation" } ]
11//! }
12//! ```
13
14use std::collections::{BTreeSet, HashMap};
15
16use flowscope_core::{AnalyzeResult, Edge, EdgeType, Node, NodeType, StatementMeta};
17use serde::Serialize;
18
19/// Hard cap on the number of column nodes visited while tracing a single
20/// target column back to its sources. The backward walk uses an explicit
21/// stack plus a `visited` set, so it is already O(V+E); this constant is a
22/// safety net against adversarial graphs with absurdly wide column fan-in.
23const MAX_TRAVERSAL_NODES: usize = 10_000;
24
25/// Per-statement view over the global graph, mirroring the shape of the
26/// legacy `StatementLineage` so the Dali mapping helpers can remain local.
27struct StatementView<'a> {
28    statement_index: usize,
29    statement_type: &'a str,
30    nodes: Vec<&'a Node>,
31    edges: Vec<&'a Edge>,
32}
33
34// ── Public API ──────────────────────────────────────────────────
35
36/// Convert an `AnalyzeResult` into the Dali-compatible JSON string.
37///
38/// Serialisation is effectively infallible for `DaliOutput` (no custom
39/// `Serialize` impls, no non-string map keys), but we surface the error
40/// rather than panic so library consumers can handle it uniformly.
41pub fn export_dali_compat(result: &AnalyzeResult, sql: &str) -> Result<String, serde_json::Error> {
42    let output = build_dali_output(result, sql);
43    serde_json::to_string_pretty(&output)
44}
45
46/// Convert an `AnalyzeResult` into the Dali-compatible JSON string (compact).
47pub fn export_dali_compat_compact(
48    result: &AnalyzeResult,
49    sql: &str,
50) -> Result<String, serde_json::Error> {
51    let output = build_dali_output(result, sql);
52    serde_json::to_string(&output)
53}
54
55// ── Serialisable types matching the Dali contract ───────────────
56
57#[derive(Debug, Serialize, serde::Deserialize)]
58pub struct DaliOutput {
59    pub package: String,
60    pub transforms: Vec<DaliTransform>,
61    pub table_lineage: Vec<DaliTableLineage>,
62}
63
64#[derive(Debug, Serialize, serde::Deserialize)]
65pub struct DaliTransform {
66    pub name: String,
67    #[serde(rename = "targetTables")]
68    pub target_tables: Vec<String>,
69    pub query: String,
70    pub is_union: bool,
71    pub refs: Vec<DaliRef>,
72    pub source_tables: Vec<String>,
73}
74
75#[derive(Debug, Serialize, serde::Deserialize)]
76pub struct DaliRef {
77    pub target_column: String,
78    pub source_columns: Vec<DaliSourceColumn>,
79}
80
81#[derive(Debug, Serialize, serde::Deserialize)]
82pub struct DaliSourceColumn {
83    pub expression: String,
84    pub columns: Vec<String>,
85}
86
87#[derive(Debug, Serialize, serde::Deserialize)]
88pub struct DaliTableLineage {
89    pub transform: String,
90    pub target_tables: Vec<String>,
91    pub source_tables: Vec<String>,
92    pub relation: String,
93}
94
95// ── Mapping logic ───────────────────────────────────────────────
96
97fn build_dali_output(result: &AnalyzeResult, sql: &str) -> DaliOutput {
98    let mut transforms = Vec::new();
99    let mut table_lineage = Vec::new();
100
101    for stmt in &result.statements {
102        let view = statement_view(result, stmt);
103        let written_tables = collect_written_tables(&view);
104        if written_tables.is_empty() {
105            continue;
106        }
107
108        let source_tables = collect_source_tables(&view, &written_tables);
109        let refs = build_refs(&view);
110        let relation = map_relation(view.statement_type);
111        let name = build_transform_name(&view);
112
113        transforms.push(DaliTransform {
114            name: name.clone(),
115            target_tables: written_tables.to_vec(),
116            query: String::new(), // FlowScope doesn't store per-statement SQL text
117            is_union: false,
118            refs,
119            source_tables: source_tables.iter().cloned().collect(),
120        });
121
122        table_lineage.push(DaliTableLineage {
123            transform: name,
124            target_tables: written_tables.into_iter().collect(),
125            source_tables: source_tables.into_iter().collect(),
126            relation,
127        });
128    }
129
130    DaliOutput {
131        package: sql.to_string(),
132        transforms,
133        table_lineage,
134    }
135}
136
137/// Build a statement-scoped view over the flattened global graph.
138fn statement_view<'a>(result: &'a AnalyzeResult, stmt: &'a StatementMeta) -> StatementView<'a> {
139    StatementView {
140        statement_index: stmt.statement_index,
141        statement_type: stmt.statement_type.as_str(),
142        nodes: result.nodes_in_statement(stmt.statement_index).collect(),
143        edges: result.edges_in_statement(stmt.statement_index).collect(),
144    }
145}
146
147/// Collect tables that are written to.
148///
149/// Runs three heuristics in order, each handling a category of DML/DDL
150/// pattern. Helpers are split out so each pass can be read — and tested —
151/// independently.
152///
153/// A table is "written" if it:
154/// 1. Has a DataFlow edge pointing TO it (e.g., INSERT..SELECT), OR
155/// 2. Owns columns that are targets of DataFlow edges from columns owned by
156///    other relations (e.g., UPDATE SET col = subquery, MERGE with USING), OR
157/// 3. Is the sole Ownership-only table for a DML statement where FlowScope
158///    did not emit any DataFlow edge at the table level (pure UPDATE/DELETE).
159fn collect_written_tables(stmt: &StatementView<'_>) -> Vec<String> {
160    // Only consider actual Table/View nodes as potential write targets,
161    // not CTEs (which are intermediate relations like USING subquery aliases).
162    let table_nodes: Vec<&Node> = stmt
163        .nodes
164        .iter()
165        .copied()
166        .filter(|n| n.node_type.is_table_or_view())
167        .collect();
168
169    let mut written = BTreeSet::new();
170    written.extend(tables_with_incoming_dataflow(stmt, &table_nodes));
171    written.extend(tables_with_cross_owner_column_dataflow(stmt, &table_nodes));
172
173    if written.is_empty() {
174        if let Some(name) = dml_ownership_only_target(stmt, &table_nodes) {
175            written.insert(name);
176        }
177    }
178
179    written.into_iter().collect()
180}
181
182/// Heuristic 1: Tables that are the destination of a table-level DataFlow edge
183/// (e.g., `INSERT INTO t SELECT ...`, `CREATE TABLE t AS SELECT ...`).
184fn tables_with_incoming_dataflow(
185    stmt: &StatementView<'_>,
186    table_nodes: &[&Node],
187) -> BTreeSet<String> {
188    let mut written = BTreeSet::new();
189    for node in table_nodes {
190        let is_target = stmt
191            .edges
192            .iter()
193            .any(|edge| edge.to == node.id && edge.edge_type == EdgeType::DataFlow);
194        if is_target {
195            written.insert(relation_display_name(node));
196        }
197    }
198    written
199}
200
201/// Heuristic 2: Tables that own columns receiving DataFlow from columns owned
202/// by a *different* relation. Covers patterns like `UPDATE t SET col = other.x`
203/// or `MERGE ... USING src` where the write shows up at the column level only.
204fn tables_with_cross_owner_column_dataflow(
205    stmt: &StatementView<'_>,
206    table_nodes: &[&Node],
207) -> BTreeSet<String> {
208    let mut col_owner: HashMap<&str, &str> = HashMap::new();
209    for edge in &stmt.edges {
210        if edge.edge_type == EdgeType::Ownership && table_nodes.iter().any(|t| t.id == edge.from) {
211            col_owner.insert(edge.to.as_ref(), edge.from.as_ref());
212        }
213    }
214
215    let column_ids: BTreeSet<&str> = stmt
216        .nodes
217        .iter()
218        .copied()
219        .filter(|n| n.node_type == NodeType::Column)
220        .map(|n| n.id.as_ref())
221        .collect();
222
223    let mut written = BTreeSet::new();
224    for edge in &stmt.edges {
225        if edge.edge_type != EdgeType::DataFlow {
226            continue;
227        }
228        let from_is_col = column_ids.contains(edge.from.as_ref());
229        let to_is_col = column_ids.contains(edge.to.as_ref());
230        if !(from_is_col && to_is_col) {
231            continue;
232        }
233        let from_owner = col_owner.get(edge.from.as_ref());
234        let to_owner = col_owner.get(edge.to.as_ref());
235        if let (Some(&from_tbl), Some(&to_tbl)) = (from_owner, to_owner) {
236            if from_tbl != to_tbl {
237                if let Some(tbl_node) = table_nodes.iter().find(|t| t.id.as_ref() == to_tbl) {
238                    written.insert(relation_display_name(tbl_node));
239                }
240            }
241        }
242    }
243    written
244}
245
246/// Heuristic 3: DML fallback. For MERGE/UPDATE/DELETE statements, the target
247/// table may only have Ownership edges — columns exist but no DataFlow edges
248/// connect them (e.g., `DELETE FROM t WHERE id IN (...)` with no column
249/// transformation). Pick the first such table as the write target.
250fn dml_ownership_only_target(stmt: &StatementView<'_>, table_nodes: &[&Node]) -> Option<String> {
251    let stmt_upper = stmt.statement_type.to_uppercase();
252    if !matches!(stmt_upper.as_str(), "MERGE" | "UPDATE" | "DELETE") {
253        return None;
254    }
255    for node in table_nodes {
256        let has_ownership = stmt
257            .edges
258            .iter()
259            .any(|e| e.from == node.id && e.edge_type == EdgeType::Ownership);
260        let has_dataflow = stmt
261            .edges
262            .iter()
263            .any(|e| (e.from == node.id || e.to == node.id) && e.edge_type == EdgeType::DataFlow);
264        if has_ownership && !has_dataflow {
265            return Some(relation_display_name(node));
266        }
267    }
268    None
269}
270
271/// Preferred display name for a relation node (qualified name when present,
272/// otherwise the node label).
273fn relation_display_name(node: &Node) -> String {
274    node.qualified_name
275        .as_deref()
276        .unwrap_or(&node.label)
277        .to_string()
278}
279
280/// Collect source tables (read from, excluding written tables).
281fn collect_source_tables(stmt: &StatementView<'_>, written: &[String]) -> BTreeSet<String> {
282    let written_set: BTreeSet<&str> = written.iter().map(|s| s.as_str()).collect();
283    let mut sources = BTreeSet::new();
284
285    for node in stmt.nodes.iter().copied() {
286        if !matches!(node.node_type, NodeType::Table | NodeType::View) {
287            continue;
288        }
289        let name = node
290            .qualified_name
291            .as_deref()
292            .unwrap_or(&node.label)
293            .to_string();
294        if written_set.contains(name.as_str()) {
295            continue;
296        }
297        // Include if it has outgoing DataFlow edges (is read from)
298        // or is referenced but not written (external source)
299        let is_source = stmt
300            .edges
301            .iter()
302            .any(|edge| edge.from == node.id && edge.edge_type == EdgeType::DataFlow);
303        let is_not_written = !stmt
304            .edges
305            .iter()
306            .any(|edge| edge.to == node.id && edge.edge_type == EdgeType::DataFlow);
307        if is_source || is_not_written {
308            sources.insert(name);
309        }
310    }
311    sources
312}
313
314/// Build column-level refs from the lineage graph.
315///
316/// For each column owned by a target table, trace backward through
317/// DataFlow and Derivation edges to find the ultimate source columns
318/// (columns owned by source tables).
319fn build_refs(stmt: &StatementView<'_>) -> Vec<DaliRef> {
320    let column_nodes: Vec<&Node> = stmt
321        .nodes
322        .iter()
323        .copied()
324        .filter(|n| n.node_type == NodeType::Column)
325        .collect();
326
327    let relation_nodes: Vec<&Node> = stmt
328        .nodes
329        .iter()
330        .copied()
331        .filter(|n| n.node_type.is_table_like() || n.node_type == NodeType::Output)
332        .collect();
333
334    // Map column_id -> owning relation (table/view/cte/output) qualified name
335    let mut column_owner: HashMap<&str, (&str, bool)> = HashMap::new(); // col_id -> (table_name, is_source_table)
336    for edge in &stmt.edges {
337        if edge.edge_type == EdgeType::Ownership {
338            if let Some(rel) = relation_nodes.iter().find(|n| n.id == edge.from) {
339                let name = rel.qualified_name.as_deref().unwrap_or(&rel.label);
340                let is_source = matches!(rel.node_type, NodeType::Table | NodeType::View)
341                    && !stmt
342                        .edges
343                        .iter()
344                        .any(|e| e.to == rel.id && e.edge_type == EdgeType::DataFlow);
345                column_owner.insert(edge.to.as_ref(), (name, is_source));
346            }
347        }
348    }
349
350    // Identify target table nodes (tables/views with DataFlow edges TO them)
351    let target_table_ids: BTreeSet<&str> = stmt
352        .edges
353        .iter()
354        .filter(|e| e.edge_type == EdgeType::DataFlow)
355        .filter(|e| {
356            relation_nodes
357                .iter()
358                .any(|n| n.id == e.to && n.node_type.is_table_like())
359        })
360        .map(|e| e.to.as_ref())
361        .collect();
362
363    // Target columns = columns owned by target tables
364    let target_column_ids: BTreeSet<&str> = stmt
365        .edges
366        .iter()
367        .filter(|e| {
368            e.edge_type == EdgeType::Ownership && target_table_ids.contains(e.from.as_ref())
369        })
370        .map(|e| e.to.as_ref())
371        .collect();
372
373    // Build adjacency: for each column, which columns flow INTO it along with
374    // the incoming edge's SQL expression (if any). The expression describes
375    // how the predecessor column is consumed — we propagate it to the emitted
376    // DaliSourceColumn so downstream consumers see the real transformation
377    // rather than just the column name.
378    let mut incoming: HashMap<&str, Vec<(&str, Option<&str>)>> = HashMap::new();
379    for edge in &stmt.edges {
380        if matches!(edge.edge_type, EdgeType::Derivation | EdgeType::DataFlow) {
381            let from_is_col = column_nodes.iter().any(|c| c.id == edge.from);
382            let to_is_col = column_nodes.iter().any(|c| c.id == edge.to);
383            if from_is_col && to_is_col {
384                incoming
385                    .entry(edge.to.as_ref())
386                    .or_default()
387                    .push((edge.from.as_ref(), edge.expression.as_deref()));
388            }
389        }
390    }
391
392    // For each target column, trace backward to find source table columns
393    let mut refs: Vec<DaliRef> = Vec::new();
394    let mut seen_labels = BTreeSet::new();
395
396    for col in &column_nodes {
397        if !target_column_ids.contains(col.id.as_ref()) {
398            continue;
399        }
400        if !seen_labels.insert(col.label.to_string()) {
401            continue;
402        }
403
404        // (table, column, expression) — expression is the edge expression
405        // closest to the source column, falling back to the column name.
406        let mut sources: Vec<(String, String, Option<String>)> = Vec::new();
407        let mut visited = BTreeSet::new();
408        let mut stack: Vec<&str> = vec![col.id.as_ref()];
409
410        while let Some(current) = stack.pop() {
411            if !visited.insert(current) {
412                continue;
413            }
414            // Defensive upper bound on traversal. In practice the `visited`
415            // set keeps work O(columns), but a hard cap guards against
416            // adversarial or pathological graphs.
417            if visited.len() > MAX_TRAVERSAL_NODES {
418                break;
419            }
420            if let Some(predecessors) = incoming.get(current) {
421                for &(pred, edge_expr) in predecessors {
422                    if let Some(&(table_name, is_source)) = column_owner.get(pred) {
423                        if is_source {
424                            if let Some(pred_node) =
425                                column_nodes.iter().find(|c| c.id.as_ref() == pred)
426                            {
427                                sources.push((
428                                    table_name.to_string(),
429                                    pred_node.label.to_string(),
430                                    edge_expr.map(str::to_string),
431                                ));
432                            }
433                        } else {
434                            // Intermediate column (output/CTE) — keep tracing
435                            stack.push(pred);
436                        }
437                    } else {
438                        // No owner found — keep tracing
439                        stack.push(pred);
440                    }
441                }
442            }
443        }
444
445        if sources.is_empty() {
446            continue;
447        }
448
449        // Deduplicate by qualified name; merge expressions so the first
450        // non-empty expression wins (keeps output stable across runs).
451        let mut source_columns = Vec::new();
452        let mut seen_src = BTreeSet::new();
453        for (table, column, expression) in &sources {
454            let qualified = format!("{table}.{column}");
455            if seen_src.insert(qualified.clone()) {
456                source_columns.push(DaliSourceColumn {
457                    expression: expression.clone().unwrap_or_else(|| column.clone()),
458                    columns: vec![qualified],
459                });
460            }
461        }
462
463        refs.push(DaliRef {
464            target_column: col.label.to_string(),
465            source_columns,
466        });
467    }
468
469    refs
470}
471
472/// Map FlowScope statement_type to Dali relation string.
473fn map_relation(statement_type: &str) -> String {
474    match statement_type.to_uppercase().as_str() {
475        "INSERT" => "INSERT_SELECT".to_string(),
476        "CREATE VIEW" | "CREATE_VIEW" => "VIEW_SELECT".to_string(),
477        "MERGE" => "MERGE".to_string(),
478        "UPDATE" => "UPDATE".to_string(),
479        "DELETE" => "DELETE".to_string(),
480        "CREATE TABLE" | "CREATE_TABLE" | "CREATE_TABLE_AS" | "CREATE TABLE AS" => {
481            "TABLE_SELECT".to_string()
482        }
483        other => other.to_string(),
484    }
485}
486
487/// Build a transform name from the statement.
488fn build_transform_name(stmt: &StatementView<'_>) -> String {
489    let stmt_type = stmt.statement_type.to_uppercase();
490    format!("{}:{}", stmt_type, stmt.statement_index)
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use flowscope_core::{analyze, AnalyzeRequest, Dialect};
497
498    fn analyze_oracle(sql: &str) -> AnalyzeResult {
499        let request = AnalyzeRequest {
500            sql: sql.to_string(),
501            files: None,
502            dialect: Dialect::Oracle,
503            source_name: None,
504            options: None,
505            schema: None,
506            #[cfg(feature = "templating")]
507            template_config: None,
508        };
509        analyze(&request)
510    }
511
512    #[test]
513    fn insert_select_produces_transform() {
514        let sql = "INSERT INTO target_table (col1, col2) SELECT a, b FROM source_table";
515        let result = analyze_oracle(sql);
516        let output = build_dali_output(&result, sql);
517
518        assert_eq!(output.transforms.len(), 1);
519        assert_eq!(output.table_lineage.len(), 1);
520
521        let t = &output.transforms[0];
522        assert_eq!(t.target_tables, vec!["TARGET_TABLE"]);
523        assert!(t.source_tables.contains(&"SOURCE_TABLE".to_string()));
524        assert_eq!(output.table_lineage[0].relation, "INSERT_SELECT");
525    }
526
527    #[test]
528    fn create_view_produces_view_select_relation() {
529        let sql = "CREATE VIEW my_view AS SELECT id, name FROM base_table";
530        let result = analyze_oracle(sql);
531        let output = build_dali_output(&result, sql);
532
533        assert_eq!(output.transforms.len(), 1);
534        let t = &output.transforms[0];
535        assert_eq!(t.target_tables, vec!["MY_VIEW"]);
536        assert!(t.source_tables.contains(&"BASE_TABLE".to_string()));
537        assert_eq!(output.table_lineage[0].relation, "VIEW_SELECT");
538    }
539
540    #[test]
541    fn refs_contain_column_level_mappings() {
542        let sql = "INSERT INTO tgt (x, y) SELECT a, b FROM src";
543        let result = analyze_oracle(sql);
544        let output = build_dali_output(&result, sql);
545
546        let t = &output.transforms[0];
547        assert!(!t.refs.is_empty(), "refs should not be empty");
548
549        // FlowScope labels target columns with SELECT output names
550        let ref_targets: Vec<&str> = t.refs.iter().map(|r| r.target_column.as_str()).collect();
551        assert_eq!(
552            ref_targets.len(),
553            2,
554            "should have 2 refs, got {ref_targets:?}"
555        );
556
557        // Check source columns reference the source table
558        for r in &t.refs {
559            for sc in &r.source_columns {
560                assert!(
561                    sc.columns
562                        .iter()
563                        .any(|c| c.to_uppercase().starts_with("SRC.")),
564                    "source column should reference SRC table, got {:?}",
565                    sc.columns
566                );
567            }
568        }
569    }
570
571    #[test]
572    fn update_produces_correct_relation() {
573        let sql = "UPDATE target_table SET col1 = src.val FROM source_table src WHERE target_table.id = src.id";
574        let result = analyze_oracle(sql);
575        let output = build_dali_output(&result, sql);
576
577        if !output.table_lineage.is_empty() {
578            assert_eq!(output.table_lineage[0].relation, "UPDATE");
579        }
580    }
581
582    #[test]
583    fn merge_produces_correct_relation() {
584        let sql = "MERGE INTO tgt USING src ON (tgt.id = src.id) WHEN MATCHED THEN UPDATE SET tgt.val = src.val WHEN NOT MATCHED THEN INSERT (id, val) VALUES (src.id, src.val)";
585        let result = analyze_oracle(sql);
586        let output = build_dali_output(&result, sql);
587
588        assert!(!output.transforms.is_empty());
589        assert_eq!(output.table_lineage[0].relation, "MERGE");
590    }
591
592    #[test]
593    fn delete_produces_correct_relation() {
594        let sql = "DELETE FROM target_table WHERE id IN (SELECT id FROM src)";
595        let result = analyze_oracle(sql);
596        let output = build_dali_output(&result, sql);
597
598        if !output.table_lineage.is_empty() {
599            assert_eq!(output.table_lineage[0].relation, "DELETE");
600        }
601    }
602
603    #[test]
604    fn standalone_select_produces_no_transform() {
605        let sql = "SELECT a, b FROM some_table";
606        let result = analyze_oracle(sql);
607        let output = build_dali_output(&result, sql);
608
609        // Standalone SELECT has no target table, so no transforms
610        assert!(output.transforms.is_empty());
611        assert!(output.table_lineage.is_empty());
612    }
613
614    #[test]
615    fn package_contains_original_sql() {
616        let sql = "INSERT INTO t (c) SELECT c FROM s";
617        let result = analyze_oracle(sql);
618        let output = build_dali_output(&result, sql);
619
620        assert_eq!(output.package, sql);
621    }
622
623    #[test]
624    fn output_is_valid_json() {
625        let sql = "INSERT INTO t (c) SELECT c FROM s";
626        let result = analyze_oracle(sql);
627        let json_str = export_dali_compat(&result, sql).expect("export should succeed");
628
629        let parsed: serde_json::Value =
630            serde_json::from_str(&json_str).expect("output should be valid JSON");
631        assert!(parsed.get("package").is_some());
632        assert!(parsed.get("transforms").is_some());
633        assert!(parsed.get("table_lineage").is_some());
634    }
635
636    #[test]
637    fn compact_output_has_no_newlines_in_values() {
638        let sql = "INSERT INTO t (c) SELECT c FROM s";
639        let result = analyze_oracle(sql);
640        let json_str = export_dali_compat_compact(&result, sql).expect("export should succeed");
641
642        // Compact JSON should be a single line (no pretty printing)
643        assert!(
644            !json_str.contains("\n  "),
645            "compact output should not be indented"
646        );
647    }
648
649    #[test]
650    fn source_columns_have_expression_and_columns() {
651        let sql = "INSERT INTO tgt (x) SELECT a FROM src";
652        let result = analyze_oracle(sql);
653        let output = build_dali_output(&result, sql);
654
655        for t in &output.transforms {
656            for r in &t.refs {
657                for sc in &r.source_columns {
658                    assert!(!sc.expression.is_empty(), "expression should not be empty");
659                    assert!(!sc.columns.is_empty(), "columns should not be empty");
660                }
661            }
662        }
663    }
664}