Skip to main content

flowscope_export/
extract.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2
3use flowscope_core::{AnalyzeResult, EdgeType, NodeType};
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
7#[serde(rename_all = "camelCase")]
8pub struct ScriptInfo {
9    pub source_name: String,
10    pub statement_count: usize,
11    pub tables_read: Vec<String>,
12    pub tables_written: Vec<String>,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "camelCase")]
17pub struct TableInfo {
18    pub name: String,
19    pub qualified_name: String,
20    #[serde(rename = "type")]
21    pub table_type: TableType,
22    pub columns: Vec<String>,
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub source_name: Option<String>,
25}
26
27#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(rename_all = "lowercase")]
29pub enum TableType {
30    Table,
31    View,
32    Cte,
33}
34
35impl TableType {
36    pub fn as_str(&self) -> &'static str {
37        match self {
38            TableType::Table => "table",
39            TableType::View => "view",
40            TableType::Cte => "cte",
41        }
42    }
43}
44
45impl std::fmt::Display for TableType {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.write_str(self.as_str())
48    }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
52#[serde(rename_all = "camelCase")]
53pub struct ColumnMapping {
54    pub source_table: String,
55    pub source_column: String,
56    pub target_table: String,
57    pub target_column: String,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub expression: Option<String>,
60    pub edge_type: String,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64#[serde(rename_all = "camelCase")]
65pub struct TableDependency {
66    pub source_table: String,
67    pub target_table: String,
68}
69
70pub fn extract_script_info(result: &AnalyzeResult) -> Vec<ScriptInfo> {
71    let mut script_map: HashMap<String, ScriptInfo> = HashMap::new();
72
73    for stmt in &result.statements {
74        let source_name = stmt
75            .source_name
76            .clone()
77            .unwrap_or_else(|| "default".to_string());
78        let entry = script_map
79            .entry(source_name.clone())
80            .or_insert_with(|| ScriptInfo {
81                source_name: source_name.clone(),
82                statement_count: 0,
83                tables_read: Vec::new(),
84                tables_written: Vec::new(),
85            });
86
87        entry.statement_count += 1;
88
89        let mut tables_read: BTreeSet<String> = entry.tables_read.iter().cloned().collect();
90        let mut tables_written: BTreeSet<String> = entry.tables_written.iter().cloned().collect();
91
92        let stmt_edges: Vec<_> = result.edges_in_statement(stmt.statement_index).collect();
93
94        for node in result.nodes_in_statement(stmt.statement_index) {
95            if matches!(node.node_type, NodeType::Table | NodeType::View) {
96                let is_written = stmt_edges
97                    .iter()
98                    .any(|edge| edge.to == node.id && edge.edge_type == EdgeType::DataFlow);
99                let is_read = stmt_edges
100                    .iter()
101                    .any(|edge| edge.from == node.id && edge.edge_type == EdgeType::DataFlow);
102
103                let table_name = node
104                    .qualified_name
105                    .as_deref()
106                    .unwrap_or(&node.label)
107                    .to_string();
108
109                if is_written {
110                    tables_written.insert(table_name.clone());
111                }
112                // A table is considered "read" if it's explicitly read OR if it's
113                // referenced but not written (implying it's an external/source table)
114                if is_read || !is_written {
115                    tables_read.insert(table_name);
116                }
117            }
118        }
119
120        entry.tables_read = tables_read.into_iter().collect();
121        entry.tables_written = tables_written.into_iter().collect();
122    }
123
124    let mut values: Vec<_> = script_map.into_values().collect();
125    values.sort_by(|a, b| a.source_name.cmp(&b.source_name));
126    values
127}
128
129pub fn extract_table_info(result: &AnalyzeResult) -> Vec<TableInfo> {
130    let mut table_map: BTreeMap<String, TableInfo> = BTreeMap::new();
131
132    let column_labels: HashMap<&str, &str> = result
133        .nodes
134        .iter()
135        .filter(|node| node.node_type == NodeType::Column)
136        .map(|col| (col.id.as_ref(), col.label.as_ref()))
137        .collect();
138
139    for table_node in result.nodes.iter().filter(|n| n.node_type.is_table_like()) {
140        let key = table_node
141            .qualified_name
142            .as_deref()
143            .unwrap_or(&table_node.label)
144            .to_string();
145
146        let columns: BTreeSet<String> = result
147            .edges
148            .iter()
149            .filter(|edge| edge.edge_type == EdgeType::Ownership && edge.from == table_node.id)
150            .filter_map(|edge| column_labels.get(edge.to.as_ref()).map(|s| s.to_string()))
151            .collect();
152
153        let table_type = match table_node.node_type {
154            NodeType::View => TableType::View,
155            NodeType::Cte => TableType::Cte,
156            _ => TableType::Table,
157        };
158
159        // A merged node may participate in multiple statements with different
160        // source names. Pick the source name of the first statement it belongs
161        // to so the result remains deterministic.
162        let source_name = table_node
163            .statement_ids
164            .first()
165            .and_then(|sid| result.statements.iter().find(|s| s.statement_index == *sid))
166            .and_then(|s| s.source_name.clone());
167
168        let entry = table_map.entry(key.clone()).or_insert_with(|| TableInfo {
169            name: table_node.label.to_string(),
170            qualified_name: key.clone(),
171            table_type,
172            columns: Vec::new(),
173            source_name,
174        });
175
176        let mut merged: BTreeSet<String> = entry.columns.iter().cloned().collect();
177        merged.extend(columns);
178        entry.columns = merged.into_iter().collect();
179    }
180
181    table_map.into_values().collect()
182}
183
184pub fn extract_column_mappings(result: &AnalyzeResult) -> Vec<ColumnMapping> {
185    let mut mappings = Vec::new();
186
187    let table_nodes: Vec<_> = result
188        .nodes
189        .iter()
190        .filter(|node| node.node_type.is_table_like())
191        .collect();
192    let column_nodes: Vec<_> = result
193        .nodes
194        .iter()
195        .filter(|node| node.node_type == NodeType::Column)
196        .collect();
197
198    let mut column_to_table: HashMap<&str, &str> = HashMap::new();
199    for edge in &result.edges {
200        if edge.edge_type == EdgeType::Ownership {
201            if let Some(table_node) = table_nodes.iter().find(|node| node.id == edge.from) {
202                let table_name = table_node
203                    .qualified_name
204                    .as_deref()
205                    .unwrap_or(&table_node.label);
206                column_to_table.insert(edge.to.as_ref(), table_name);
207            }
208        }
209    }
210
211    for edge in &result.edges {
212        if edge.edge_type == EdgeType::Derivation || edge.edge_type == EdgeType::DataFlow {
213            let source_col = column_nodes.iter().find(|col| col.id == edge.from);
214            let target_col = column_nodes.iter().find(|col| col.id == edge.to);
215
216            if let (Some(source), Some(target)) = (source_col, target_col) {
217                let source_table = column_to_table
218                    .get(edge.from.as_ref())
219                    .copied()
220                    .unwrap_or("Output");
221                let target_table = column_to_table
222                    .get(edge.to.as_ref())
223                    .copied()
224                    .unwrap_or("Output");
225
226                let expression: Option<String> = edge
227                    .expression
228                    .as_ref()
229                    .map(|value| value.to_string())
230                    .or_else(|| target.expression.as_ref().map(|value| value.to_string()));
231
232                mappings.push(ColumnMapping {
233                    source_table: source_table.to_string(),
234                    source_column: source.label.to_string(),
235                    target_table: target_table.to_string(),
236                    target_column: target.label.to_string(),
237                    expression,
238                    edge_type: edge_type_label(edge.edge_type).to_string(),
239                });
240            }
241        }
242    }
243
244    mappings
245}
246
247pub fn extract_table_dependencies(result: &AnalyzeResult) -> Vec<TableDependency> {
248    let mut dependencies = Vec::new();
249    let mut seen: BTreeSet<String> = BTreeSet::new();
250
251    let relation_nodes: Vec<_> = result
252        .nodes
253        .iter()
254        .filter(|node| node.node_type.is_relation())
255        .collect();
256
257    for edge in &result.edges {
258        if edge.edge_type == EdgeType::DataFlow || edge.edge_type == EdgeType::JoinDependency {
259            let source_node = relation_nodes.iter().find(|node| node.id == edge.from);
260            let target_node = relation_nodes.iter().find(|node| node.id == edge.to);
261
262            if let (Some(source), Some(target)) = (source_node, target_node) {
263                let source_key = source
264                    .qualified_name
265                    .as_deref()
266                    .unwrap_or(&source.label)
267                    .to_string();
268                let target_key = target
269                    .qualified_name
270                    .as_deref()
271                    .unwrap_or(&target.label)
272                    .to_string();
273                let dep_key = format!("{source_key}->{target_key}");
274
275                if source_key != target_key && seen.insert(dep_key) {
276                    dependencies.push(TableDependency {
277                        source_table: source_key,
278                        target_table: target_key,
279                    });
280                }
281            }
282        }
283    }
284
285    dependencies
286}
287
288fn edge_type_label(edge_type: EdgeType) -> &'static str {
289    match edge_type {
290        EdgeType::Ownership => "ownership",
291        EdgeType::DataFlow => "data_flow",
292        EdgeType::Derivation => "derivation",
293        EdgeType::JoinDependency => "join_dependency",
294        EdgeType::CrossStatement => "cross_statement",
295    }
296}