pub fn tables_ddl(prefix: &str) -> String {
format!(
r#"
-- Metadata about the export
CREATE TABLE {prefix}_meta (
key TEXT PRIMARY KEY,
value TEXT
);
-- SQL statements analyzed
CREATE TABLE {prefix}statements (
id INTEGER PRIMARY KEY,
statement_index INTEGER NOT NULL,
statement_type TEXT NOT NULL,
source_name TEXT,
span_start INTEGER,
span_end INTEGER,
join_count INTEGER NOT NULL DEFAULT 0,
complexity_score INTEGER
);
-- Graph nodes (tables, CTEs, columns, outputs).
-- Each node appears once. Statement participation is tracked via
-- {prefix}node_statements.
CREATE TABLE {prefix}nodes (
id TEXT PRIMARY KEY,
node_type TEXT NOT NULL,
label TEXT NOT NULL,
qualified_name TEXT,
canonical_catalog TEXT,
canonical_schema TEXT,
canonical_name TEXT,
canonical_column TEXT,
expression TEXT,
span_start INTEGER,
span_end INTEGER,
body_span_start INTEGER,
body_span_end INTEGER,
resolution_source TEXT
);
-- Junction: which statements each node participates in.
CREATE TABLE {prefix}node_statements (
node_id TEXT NOT NULL REFERENCES {prefix}nodes(id),
statement_id INTEGER NOT NULL REFERENCES {prefix}statements(id),
PRIMARY KEY (node_id, statement_id)
);
-- All name-occurrence spans for a node, accumulated across every statement
-- the node appears in. Ordered by (span_start, span_end) at write time.
CREATE TABLE {prefix}node_name_spans (
id INTEGER PRIMARY KEY,
node_id TEXT NOT NULL REFERENCES {prefix}nodes(id),
span_start INTEGER NOT NULL,
span_end INTEGER NOT NULL
);
-- Graph edges (data flow relationships). Each edge is unique by id.
CREATE TABLE {prefix}edges (
id TEXT PRIMARY KEY,
edge_type TEXT NOT NULL,
from_node_id TEXT NOT NULL REFERENCES {prefix}nodes(id),
to_node_id TEXT NOT NULL REFERENCES {prefix}nodes(id),
expression TEXT,
operation TEXT,
is_approximate BOOLEAN DEFAULT FALSE
);
-- Junction: which statements each edge participates in. Cross-statement
-- edges (`edge_type = 'cross_statement'`) carry both producer and consumer.
CREATE TABLE {prefix}edge_statements (
edge_id TEXT NOT NULL REFERENCES {prefix}edges(id),
statement_id INTEGER NOT NULL REFERENCES {prefix}statements(id),
PRIMARY KEY (edge_id, statement_id)
);
-- Join metadata attached to edges (not nodes) to support both table-level
-- and column-level join semantics.
CREATE TABLE {prefix}joins (
id INTEGER PRIMARY KEY,
edge_id TEXT NOT NULL REFERENCES {prefix}edges(id),
join_type TEXT NOT NULL,
join_condition TEXT
);
-- Filter predicates on nodes, scoped to the statements where they apply.
CREATE TABLE {prefix}filters (
id INTEGER PRIMARY KEY,
node_id TEXT NOT NULL REFERENCES {prefix}nodes(id),
statement_id INTEGER NOT NULL REFERENCES {prefix}statements(id),
predicate TEXT NOT NULL,
filter_type TEXT
);
-- Aggregation info on column nodes.
CREATE TABLE {prefix}aggregations (
node_id TEXT NOT NULL REFERENCES {prefix}nodes(id),
statement_id INTEGER NOT NULL REFERENCES {prefix}statements(id),
is_grouping_key BOOLEAN NOT NULL,
function TEXT,
is_distinct BOOLEAN DEFAULT FALSE,
PRIMARY KEY (node_id, statement_id)
);
-- Analysis issues
CREATE TABLE {prefix}issues (
id INTEGER PRIMARY KEY,
statement_id INTEGER REFERENCES {prefix}statements(id),
severity TEXT NOT NULL,
code TEXT NOT NULL,
message TEXT NOT NULL,
span_start INTEGER,
span_end INTEGER
);
-- Schema tables (imported + inferred)
CREATE TABLE {prefix}schema_tables (
id INTEGER PRIMARY KEY,
catalog TEXT,
schema_name TEXT,
name TEXT NOT NULL,
resolution_source TEXT,
UNIQUE(catalog, schema_name, name)
);
-- Schema columns
CREATE TABLE {prefix}schema_columns (
id INTEGER PRIMARY KEY,
table_id INTEGER NOT NULL REFERENCES {prefix}schema_tables(id),
name TEXT NOT NULL,
data_type TEXT,
is_nullable BOOLEAN,
is_primary_key BOOLEAN DEFAULT FALSE
);
"#,
prefix = prefix
)
}
pub fn views_ddl(prefix: &str) -> String {
format!(
r#"
-- ============================================================================
-- LINEAGE VIEWS
-- ============================================================================
-- Direct column-to-column lineage
CREATE VIEW {prefix}column_lineage AS
SELECT
e.id AS edge_id,
s.source_name,
s.statement_index,
fn.qualified_name AS source_table,
fn.label AS source_column,
tn.qualified_name AS target_table,
tn.label AS target_column,
e.expression AS transformation,
e.operation,
e.is_approximate
FROM {prefix}edges e
JOIN {prefix}edge_statements es ON es.edge_id = e.id
JOIN {prefix}statements s ON es.statement_id = s.id
JOIN {prefix}nodes fn ON e.from_node_id = fn.id
JOIN {prefix}nodes tn ON e.to_node_id = tn.id
WHERE fn.node_type = 'column'
AND tn.node_type = 'column'
AND e.edge_type IN ('data_flow', 'derivation');
-- Table-level dependencies
CREATE VIEW {prefix}table_dependencies AS
SELECT DISTINCT
s.source_name,
fn.qualified_name AS source_table,
tn.qualified_name AS target_table,
e.edge_type
FROM {prefix}edges e
JOIN {prefix}edge_statements es ON es.edge_id = e.id
JOIN {prefix}statements s ON es.statement_id = s.id
JOIN {prefix}nodes fn ON e.from_node_id = fn.id
JOIN {prefix}nodes tn ON e.to_node_id = tn.id
WHERE fn.node_type IN ('table', 'view', 'cte')
AND tn.node_type IN ('table', 'view', 'cte');
-- Recursive: all upstream columns
CREATE VIEW {prefix}column_ancestors AS
WITH RECURSIVE ancestors AS (
SELECT
to_node_id AS column_id,
from_node_id AS ancestor_id,
1 AS depth,
expression AS transformation
FROM {prefix}edges
WHERE edge_type IN ('data_flow', 'derivation')
UNION ALL
SELECT
a.column_id,
e.from_node_id AS ancestor_id,
a.depth + 1,
e.expression
FROM ancestors a
JOIN {prefix}edges e ON a.ancestor_id = e.to_node_id
WHERE e.edge_type IN ('data_flow', 'derivation')
AND a.depth < 50
)
SELECT
n1.qualified_name AS column_table,
n1.label AS column_name,
n2.qualified_name AS ancestor_table,
n2.label AS ancestor_column,
a.depth,
a.transformation
FROM ancestors a
JOIN {prefix}nodes n1 ON a.column_id = n1.id
JOIN {prefix}nodes n2 ON a.ancestor_id = n2.id
WHERE n1.node_type = 'column'
AND n2.node_type = 'column';
-- Recursive: all downstream columns
CREATE VIEW {prefix}column_descendants AS
WITH RECURSIVE descendants AS (
SELECT
from_node_id AS column_id,
to_node_id AS descendant_id,
1 AS depth,
expression AS transformation
FROM {prefix}edges
WHERE edge_type IN ('data_flow', 'derivation')
UNION ALL
SELECT
d.column_id,
e.to_node_id AS descendant_id,
d.depth + 1,
e.expression
FROM descendants d
JOIN {prefix}edges e ON d.descendant_id = e.from_node_id
WHERE e.edge_type IN ('data_flow', 'derivation')
AND d.depth < 50
)
SELECT
n1.qualified_name AS column_table,
n1.label AS column_name,
n2.qualified_name AS descendant_table,
n2.label AS descendant_column,
d.depth,
d.transformation
FROM descendants d
JOIN {prefix}nodes n1 ON d.column_id = n1.id
JOIN {prefix}nodes n2 ON d.descendant_id = n2.id
WHERE n1.node_type = 'column'
AND n2.node_type = 'column';
-- ============================================================================
-- GRAPH VIEWS
-- ============================================================================
-- Denormalized node details (one row per node × statement participation)
CREATE VIEW {prefix}node_details AS
SELECT
n.id,
ns.statement_id,
n.node_type,
n.label,
n.qualified_name,
n.expression,
n.resolution_source,
s.statement_index,
s.source_name,
s.statement_type,
a.is_grouping_key,
a.function AS aggregation_function,
a.is_distinct AS aggregation_distinct
FROM {prefix}nodes n
JOIN {prefix}node_statements ns ON ns.node_id = n.id
LEFT JOIN {prefix}statements s ON ns.statement_id = s.id
LEFT JOIN {prefix}aggregations a ON n.id = a.node_id AND ns.statement_id = a.statement_id;
-- Denormalized edge details (one row per edge × statement participation)
CREATE VIEW {prefix}edge_details AS
SELECT
e.id,
e.edge_type,
e.operation,
e.expression,
e.is_approximate,
fn.node_type AS from_type,
fn.label AS from_label,
fn.qualified_name AS from_qualified_name,
tn.node_type AS to_type,
tn.label AS to_label,
tn.qualified_name AS to_qualified_name,
j.join_type,
j.join_condition,
s.statement_index,
s.source_name
FROM {prefix}edges e
JOIN {prefix}edge_statements es ON es.edge_id = e.id
JOIN {prefix}statements s ON es.statement_id = s.id
JOIN {prefix}nodes fn ON e.from_node_id = fn.id
JOIN {prefix}nodes tn ON e.to_node_id = tn.id
LEFT JOIN {prefix}joins j ON e.id = j.edge_id;
-- All joins with context
CREATE VIEW {prefix}join_graph AS
SELECT
s.source_name,
s.statement_index,
j.join_type,
j.join_condition,
COALESCE(fr.qualified_name, fn.qualified_name) AS from_table,
COALESCE(fr.label, fn.label) AS from_label,
COALESCE(tr.qualified_name, tn.qualified_name) AS to_table,
COALESCE(tr.label, tn.label) AS to_label
FROM {prefix}joins j
JOIN {prefix}edges e ON j.edge_id = e.id
JOIN {prefix}edge_statements es ON es.edge_id = e.id
JOIN {prefix}statements s ON es.statement_id = s.id
JOIN {prefix}nodes fn ON e.from_node_id = fn.id
JOIN {prefix}nodes tn ON e.to_node_id = tn.id
LEFT JOIN {prefix}edges feo
ON feo.edge_type = 'ownership'
AND feo.to_node_id = e.from_node_id
LEFT JOIN {prefix}nodes fr ON feo.from_node_id = fr.id
LEFT JOIN {prefix}edges teo
ON teo.edge_type = 'ownership'
AND teo.to_node_id = e.to_node_id
LEFT JOIN {prefix}nodes tr ON teo.from_node_id = tr.id;
-- Filters applied to nodes
CREATE VIEW {prefix}node_filters AS
SELECT
n.qualified_name AS table_name,
n.label AS node_label,
n.node_type,
f.predicate,
f.filter_type,
s.source_name,
s.statement_index
FROM {prefix}filters f
JOIN {prefix}nodes n ON f.node_id = n.id
LEFT JOIN {prefix}statements s ON f.statement_id = s.id;
-- ============================================================================
-- METRICS VIEWS
-- ============================================================================
-- Complexity breakdown by statement
CREATE VIEW {prefix}complexity_by_statement AS
SELECT
s.source_name,
s.statement_index,
s.statement_type,
s.complexity_score,
s.join_count,
COUNT(DISTINCT CASE WHEN n.node_type = 'table' THEN n.id END) AS table_count,
COUNT(DISTINCT CASE WHEN n.node_type = 'column' THEN n.id END) AS column_count,
COUNT(DISTINCT e.id) AS edge_count
FROM {prefix}statements s
LEFT JOIN {prefix}node_statements ns ON ns.statement_id = s.id
LEFT JOIN {prefix}nodes n ON n.id = ns.node_id
LEFT JOIN {prefix}edge_statements es ON es.statement_id = s.id
LEFT JOIN {prefix}edges e ON e.id = es.edge_id
GROUP BY s.id, s.source_name, s.statement_index, s.statement_type,
s.complexity_score, s.join_count;
-- Issue summary with context
CREATE VIEW {prefix}issues_summary AS
SELECT
i.severity,
i.code,
i.message,
s.source_name,
s.statement_index,
s.statement_type,
i.span_start,
i.span_end
FROM {prefix}issues i
LEFT JOIN {prefix}statements s ON i.statement_id = s.id;
-- Table usage statistics
CREATE VIEW {prefix}table_usage AS
SELECT
n.qualified_name AS table_name,
n.node_type,
n.resolution_source,
COUNT(DISTINCT ns.statement_id) AS statement_count,
COUNT(DISTINCT e_in.id) AS incoming_edges,
COUNT(DISTINCT e_out.id) AS outgoing_edges
FROM {prefix}nodes n
LEFT JOIN {prefix}node_statements ns ON ns.node_id = n.id
LEFT JOIN {prefix}edges e_in ON n.id = e_in.to_node_id
LEFT JOIN {prefix}edges e_out ON n.id = e_out.from_node_id
WHERE n.node_type IN ('table', 'view', 'cte')
GROUP BY n.qualified_name, n.node_type, n.resolution_source;
-- Most connected columns
CREATE VIEW {prefix}column_connectivity AS
SELECT
n.qualified_name AS table_name,
n.label AS column_name,
COUNT(DISTINCT e_in.id) AS upstream_count,
COUNT(DISTINCT e_out.id) AS downstream_count,
COUNT(DISTINCT e_in.id) + COUNT(DISTINCT e_out.id) AS total_connections
FROM {prefix}nodes n
LEFT JOIN {prefix}edges e_in ON n.id = e_in.to_node_id
LEFT JOIN {prefix}edges e_out ON n.id = e_out.from_node_id
WHERE n.node_type = 'column'
GROUP BY n.id, n.qualified_name, n.label
HAVING COUNT(DISTINCT e_in.id) + COUNT(DISTINCT e_out.id) > 0
ORDER BY total_connections DESC;
-- Statements with issues
CREATE VIEW {prefix}statements_with_issues AS
SELECT
s.source_name,
s.statement_index,
s.statement_type,
s.complexity_score,
COUNT(CASE WHEN i.severity = 'error' THEN 1 END) AS error_count,
COUNT(CASE WHEN i.severity = 'warning' THEN 1 END) AS warning_count,
COUNT(CASE WHEN i.severity = 'info' THEN 1 END) AS info_count
FROM {prefix}statements s
JOIN {prefix}issues i ON i.statement_id = s.id
GROUP BY s.id, s.source_name, s.statement_index, s.statement_type, s.complexity_score;
-- ============================================================================
-- COMPLIANCE VIEWS
-- ============================================================================
-- Full data flow paths
CREATE VIEW {prefix}data_flow_paths AS
SELECT
s.source_name,
s.statement_index,
fn.qualified_name AS source_table,
fn.label AS source_column,
e.expression AS transformation,
e.operation,
tn.qualified_name AS target_table,
tn.label AS target_column,
CASE WHEN e.is_approximate THEN 'APPROXIMATE' ELSE 'EXACT' END AS lineage_confidence
FROM {prefix}edges e
JOIN {prefix}edge_statements es ON es.edge_id = e.id
JOIN {prefix}statements s ON es.statement_id = s.id
JOIN {prefix}nodes fn ON e.from_node_id = fn.id
JOIN {prefix}nodes tn ON e.to_node_id = tn.id
WHERE e.edge_type IN ('data_flow', 'derivation');
-- Impact analysis: columns by source table
CREATE VIEW {prefix}columns_by_source_table AS
SELECT DISTINCT
column_table AS source_table,
column_name AS source_column,
descendant_table AS affected_table,
descendant_column AS affected_column,
depth AS distance
FROM {prefix}column_descendants;
-- Transformation audit
CREATE VIEW {prefix}transformation_audit AS
SELECT
s.source_name,
s.statement_index,
fn.qualified_name AS input_table,
fn.label AS input_column,
e.expression AS transformation_expression,
e.operation AS transformation_type,
tn.qualified_name AS output_table,
tn.label AS output_column,
a.function AS aggregation_applied
FROM {prefix}edges e
JOIN {prefix}edge_statements es ON es.edge_id = e.id
JOIN {prefix}statements s ON es.statement_id = s.id
JOIN {prefix}nodes fn ON e.from_node_id = fn.id
JOIN {prefix}nodes tn ON e.to_node_id = tn.id
LEFT JOIN {prefix}aggregations a ON tn.id = a.node_id AND es.statement_id = a.statement_id
WHERE e.expression IS NOT NULL
OR a.function IS NOT NULL;
-- Cross-statement dependencies
CREATE VIEW {prefix}cross_statement_flow AS
SELECT
s1.source_name AS from_source,
s1.statement_index AS from_statement,
s2.source_name AS to_source,
s2.statement_index AS to_statement,
fn.qualified_name AS shared_object,
e.edge_type
FROM {prefix}edges e
JOIN {prefix}edge_statements es1 ON es1.edge_id = e.id
JOIN {prefix}edge_statements es2 ON es2.edge_id = e.id
JOIN {prefix}statements s1 ON es1.statement_id = s1.id
JOIN {prefix}statements s2 ON es2.statement_id = s2.id
JOIN {prefix}nodes fn ON e.from_node_id = fn.id
JOIN {prefix}nodes tn ON e.to_node_id = tn.id
WHERE e.edge_type = 'cross_statement'
AND s1.id < s2.id;
-- Schema coverage
CREATE VIEW {prefix}schema_coverage AS
SELECT
st.catalog,
st.schema_name,
st.name AS table_name,
st.resolution_source,
CASE WHEN COUNT(n.id) > 0 THEN TRUE ELSE FALSE END AS is_referenced,
COUNT(DISTINCT ns.statement_id) AS reference_count
FROM {prefix}schema_tables st
LEFT JOIN {prefix}nodes n ON n.qualified_name LIKE '%' || st.name || '%'
AND n.node_type IN ('table', 'view')
LEFT JOIN {prefix}node_statements ns ON ns.node_id = n.id
GROUP BY st.id, st.catalog, st.schema_name, st.name, st.resolution_source;
"#,
prefix = prefix
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tables_ddl_no_prefix() {
let ddl = tables_ddl("");
assert!(ddl.contains("CREATE TABLE _meta"));
assert!(ddl.contains("CREATE TABLE statements"));
assert!(ddl.contains("CREATE TABLE nodes"));
assert!(ddl.contains("CREATE TABLE edges"));
assert!(ddl.contains("CREATE TABLE node_statements"));
assert!(ddl.contains("CREATE TABLE edge_statements"));
assert!(ddl.contains("CREATE TABLE issues"));
assert!(ddl.contains("REFERENCES statements(id)"));
assert!(ddl.contains("REFERENCES nodes(id)"));
}
#[test]
fn test_tables_ddl_with_prefix() {
let ddl = tables_ddl("lineage.");
assert!(ddl.contains("CREATE TABLE lineage._meta"));
assert!(ddl.contains("CREATE TABLE lineage.statements"));
assert!(ddl.contains("CREATE TABLE lineage.nodes"));
assert!(ddl.contains("CREATE TABLE lineage.edges"));
assert!(ddl.contains("CREATE TABLE lineage.node_statements"));
assert!(ddl.contains("CREATE TABLE lineage.edge_statements"));
assert!(ddl.contains("CREATE TABLE lineage.node_name_spans"));
assert!(ddl.contains("CREATE TABLE lineage.joins"));
assert!(ddl.contains("CREATE TABLE lineage.filters"));
assert!(ddl.contains("CREATE TABLE lineage.aggregations"));
assert!(ddl.contains("CREATE TABLE lineage.issues"));
assert!(ddl.contains("CREATE TABLE lineage.schema_tables"));
assert!(ddl.contains("CREATE TABLE lineage.schema_columns"));
assert!(ddl.contains("REFERENCES lineage.statements(id)"));
assert!(ddl.contains("REFERENCES lineage.nodes(id)"));
assert!(ddl.contains("REFERENCES lineage.schema_tables(id)"));
assert!(!ddl.contains("CREATE TABLE _meta "));
assert!(!ddl.contains("CREATE TABLE statements "));
assert!(!ddl.contains("CREATE TABLE nodes "));
}
#[test]
fn test_views_ddl_no_prefix() {
let ddl = views_ddl("");
assert!(ddl.contains("CREATE VIEW column_lineage"));
assert!(ddl.contains("CREATE VIEW table_dependencies"));
assert!(ddl.contains("CREATE VIEW column_ancestors"));
assert!(ddl.contains("CREATE VIEW column_descendants"));
assert!(ddl.contains("CREATE VIEW node_details"));
assert!(ddl.contains("CREATE VIEW edge_details"));
assert!(ddl.contains("CREATE VIEW join_graph"));
assert!(ddl.contains("CREATE VIEW node_filters"));
assert!(ddl.contains("FROM edges"));
assert!(ddl.contains("JOIN nodes"));
assert!(ddl.contains("JOIN statements"));
}
#[test]
fn test_views_ddl_with_prefix() {
let ddl = views_ddl("lineage.");
assert!(ddl.contains("CREATE VIEW lineage.column_lineage"));
assert!(ddl.contains("CREATE VIEW lineage.table_dependencies"));
assert!(ddl.contains("CREATE VIEW lineage.column_ancestors"));
assert!(ddl.contains("CREATE VIEW lineage.column_descendants"));
assert!(ddl.contains("CREATE VIEW lineage.node_details"));
assert!(ddl.contains("CREATE VIEW lineage.edge_details"));
assert!(ddl.contains("CREATE VIEW lineage.join_graph"));
assert!(ddl.contains("CREATE VIEW lineage.node_filters"));
assert!(ddl.contains("CREATE VIEW lineage.complexity_by_statement"));
assert!(ddl.contains("CREATE VIEW lineage.issues_summary"));
assert!(ddl.contains("CREATE VIEW lineage.table_usage"));
assert!(ddl.contains("CREATE VIEW lineage.column_connectivity"));
assert!(ddl.contains("CREATE VIEW lineage.statements_with_issues"));
assert!(ddl.contains("CREATE VIEW lineage.data_flow_paths"));
assert!(ddl.contains("CREATE VIEW lineage.columns_by_source_table"));
assert!(ddl.contains("CREATE VIEW lineage.transformation_audit"));
assert!(ddl.contains("CREATE VIEW lineage.cross_statement_flow"));
assert!(ddl.contains("CREATE VIEW lineage.schema_coverage"));
assert!(ddl.contains("FROM lineage.edges"));
assert!(ddl.contains("JOIN lineage.nodes"));
assert!(ddl.contains("JOIN lineage.statements"));
assert!(ddl.contains("FROM lineage.schema_tables"));
}
#[test]
fn test_no_double_prefix() {
let tables = tables_ddl("lineage.");
let views = views_ddl("lineage.");
assert!(!tables.contains("lineage.lineage."));
assert!(!views.contains("lineage.lineage."));
}
}