use flowscope_core::{
analyze, issue_codes, AnalysisOptions, AnalyzeRequest, AnalyzeResult, ColumnSchema,
ConstraintType, Dialect, Edge, EdgeType, FilterClauseType, JoinType, Node, NodeType,
SchemaMetadata, SchemaNamespaceHint, SchemaTable, Severity,
};
use rstest::rstest;
use std::collections::HashSet;
use std::fs;
use std::path::{Path, PathBuf};
fn run_analysis(sql: &str, dialect: Dialect, schema: Option<SchemaMetadata>) -> AnalyzeResult {
analyze(&AnalyzeRequest {
sql: sql.trim().to_string(),
files: None,
dialect,
source_name: Some("integration_test".into()),
options: None,
schema,
#[cfg(feature = "templating")]
template_config: None,
})
}
fn run_analysis_with_options(
sql: &str,
dialect: Dialect,
schema: Option<SchemaMetadata>,
options: AnalysisOptions,
) -> AnalyzeResult {
analyze(&AnalyzeRequest {
sql: sql.trim().to_string(),
files: None,
dialect,
source_name: Some("integration_test".into()),
options: Some(options),
schema,
#[cfg(feature = "templating")]
template_config: None,
})
}
fn fixtures_root() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
}
fn dialect_fixture_dir(name: &str) -> PathBuf {
fixtures_root().join(name)
}
fn list_fixture_files(dir: &Path) -> Vec<String> {
let mut fixtures = Vec::new();
if dir.exists() {
for entry in fs::read_dir(dir).expect("failed to list fixtures") {
let entry = entry.expect("fixture entry");
if entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) == Some("sql") {
fixtures.push(path.file_name().unwrap().to_string_lossy().to_string());
}
}
}
}
fixtures.sort();
fixtures
}
fn load_sql_fixture(dialect: &str, name: &str) -> String {
let path = dialect_fixture_dir(dialect).join(name);
fs::read_to_string(&path).unwrap_or_else(|e| panic!("failed to read fixture {path:?}: {e}"))
}
fn collect_table_names(result: &AnalyzeResult) -> HashSet<String> {
let mut tables = HashSet::new();
for node in &result.nodes {
if node.node_type == NodeType::Table {
let name = node.qualified_name.as_ref().unwrap_or(&node.label);
tables.insert(name.to_string());
}
}
tables
}
fn schema_table(
catalog: Option<&str>,
schema: Option<&str>,
name: &str,
columns: &[&str],
) -> SchemaTable {
SchemaTable {
catalog: catalog.map(|c| c.to_string()),
schema: schema.map(|s| s.to_string()),
name: name.to_string(),
columns: columns.iter().map(|col| column(col)).collect(),
}
}
fn column(name: &str) -> ColumnSchema {
ColumnSchema {
name: name.to_string(),
data_type: None,
is_primary_key: None,
foreign_key: None,
}
}
#[allow(dead_code)]
fn column_typed(name: &str, data_type: &str) -> ColumnSchema {
ColumnSchema {
name: name.to_string(),
data_type: Some(data_type.to_string()),
is_primary_key: None,
foreign_key: None,
}
}
#[allow(dead_code)]
fn column_pk(name: &str, data_type: &str) -> ColumnSchema {
ColumnSchema {
name: name.to_string(),
data_type: Some(data_type.to_string()),
is_primary_key: Some(true),
foreign_key: None,
}
}
#[allow(dead_code)]
fn column_fk(name: &str, data_type: &str, ref_table: &str, ref_column: &str) -> ColumnSchema {
use flowscope_core::ForeignKeyRef;
ColumnSchema {
name: name.to_string(),
data_type: Some(data_type.to_string()),
is_primary_key: None,
foreign_key: Some(ForeignKeyRef {
table: ref_table.to_string(),
column: ref_column.to_string(),
}),
}
}
struct StmtView<'a> {
nodes: Vec<&'a Node>,
edges: Vec<&'a Edge>,
}
impl<'a> StmtView<'a> {
fn new(result: &'a AnalyzeResult, statement_index: usize) -> Self {
Self {
nodes: result.nodes_in_statement(statement_index).collect(),
edges: result.edges_in_statement(statement_index).collect(),
}
}
}
fn first_statement(result: &AnalyzeResult) -> StmtView<'_> {
let first = result
.statements
.first()
.expect("analysis should return at least one statement");
StmtView::new(result, first.statement_index)
}
fn column_labels(lineage: &StmtView<'_>) -> Vec<String> {
lineage
.nodes
.iter()
.filter(|node| node.node_type == NodeType::Column)
.map(|node| node.label.to_string())
.collect()
}
fn collect_cte_names(result: &AnalyzeResult) -> HashSet<String> {
let mut ctes = HashSet::new();
for node in &result.nodes {
if node.node_type == NodeType::Cte {
ctes.insert(node.label.to_string());
}
}
ctes
}
fn issue_codes_list(result: &AnalyzeResult) -> Vec<String> {
result
.issues
.iter()
.map(|issue| issue.code.clone())
.collect()
}
fn edges_by_type<'a>(lineage: &StmtView<'a>, edge_type: EdgeType) -> Vec<&'a Edge> {
lineage
.edges
.iter()
.copied()
.filter(|edge| edge.edge_type == edge_type)
.collect()
}
#[allow(dead_code)]
fn find_node_by_label<'a>(lineage: &StmtView<'a>, label: &str) -> Option<&'a Node> {
lineage
.nodes
.iter()
.copied()
.find(|node| &*node.label == label)
}
fn find_column_node<'a>(lineage: &StmtView<'a>, label: &str) -> Option<&'a Node> {
lineage
.nodes
.iter()
.copied()
.find(|node| node.node_type == NodeType::Column && &*node.label == label)
}
fn find_table_node<'a>(lineage: &StmtView<'a>, name: &str) -> Option<&'a Node> {
lineage.nodes.iter().copied().find(|node| {
node.node_type == NodeType::Table
&& (&*node.label == name || node.qualified_name.as_deref() == Some(name))
})
}
fn find_cte_node<'a>(lineage: &StmtView<'a>, name: &str) -> Option<&'a Node> {
lineage
.nodes
.iter()
.copied()
.find(|node| node.node_type == NodeType::Cte && &*node.label == name)
}
#[allow(dead_code)]
fn has_edge(lineage: &StmtView<'_>, from_label: &str, to_label: &str, edge_type: EdgeType) -> bool {
let from_node = find_node_by_label(lineage, from_label);
let to_node = find_node_by_label(lineage, to_label);
if let (Some(from), Some(to)) = (from_node, to_node) {
lineage
.edges
.iter()
.any(|edge| edge.from == from.id && edge.to == to.id && edge.edge_type == edge_type)
} else {
false
}
}
#[rstest]
#[case("generic", Dialect::Generic)]
#[case("postgres", Dialect::Postgres)]
#[case("snowflake", Dialect::Snowflake)]
#[case("bigquery", Dialect::Bigquery)]
#[case("redshift", Dialect::Redshift)]
#[case("mysql", Dialect::Mysql)]
fn multi_dialect_fixtures_cover_core_constructs(#[case] dir_name: &str, #[case] dialect: Dialect) {
let dir = dialect_fixture_dir(dir_name);
let fixtures = list_fixture_files(&dir);
assert!(
!fixtures.is_empty(),
"expected fixtures for dialect {dir_name}"
);
for fixture in fixtures {
let sql = load_sql_fixture(dir_name, &fixture);
let result = run_analysis(&sql, dialect, None);
assert!(
result.summary.statement_count >= 1,
"fixture {dir_name}/{fixture} produced no statements (issues: {:?})",
result.issues
);
assert!(
result
.nodes
.iter()
.any(|node| matches!(node.node_type, NodeType::Table | NodeType::Cte)),
"fixture {dir_name}/{fixture} should yield tables or CTEs"
);
assert!(
!result.summary.has_errors,
"fixture {dir_name}/{fixture} had unexpected errors: {:?}",
result.issues
);
}
}
#[test]
fn multi_stage_pipeline_emits_cross_statement_edges() {
let sql = r#"
CREATE TABLE analytics.tmp_daily_rollup AS
WITH recent_orders AS (
SELECT o.id,
o.customer_id,
o.total_amount,
d.region
FROM analytics.orders o
JOIN analytics.dim_customers d
ON o.customer_id = d.customer_id
WHERE o.order_date >= '2024-01-01'
),
spend_per_customer AS (
SELECT customer_id,
SUM(total_amount) AS total_spend,
MAX(region) AS region
FROM recent_orders
GROUP BY customer_id
)
SELECT customer_id, total_spend, region
FROM spend_per_customer;
INSERT INTO analytics.customer_snapshots (customer_id, region, total_spend)
SELECT customer_id, region, total_spend
FROM analytics.tmp_daily_rollup;
WITH leaderboard AS (
SELECT region, SUM(total_spend) AS total_spend
FROM analytics.customer_snapshots
GROUP BY region
)
SELECT * FROM leaderboard;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
assert_eq!(
result.summary.statement_count, 3,
"expected CTAS + INSERT + SELECT"
);
let tables = collect_table_names(&result);
for expected in [
"analytics.orders",
"analytics.dim_customers",
"analytics.tmp_daily_rollup",
"analytics.customer_snapshots",
] {
assert!(
tables.contains(expected),
"missing lineage for {expected:?}"
);
}
let cross_edges: Vec<_> = result
.edges
.iter()
.filter(|edge| edge.edge_type == EdgeType::CrossStatement)
.collect();
assert!(
cross_edges.len() >= 2,
"expected cross-statement edges, got {:?}",
result.edges
);
}
#[test]
fn flatten_preserves_distinct_cross_statement_pairs() {
let sql = r#"
CREATE TABLE staging.shared AS SELECT 1 AS x;
SELECT x FROM staging.shared;
SELECT x + 1 FROM staging.shared;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let cross_edges: Vec<_> = result
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::CrossStatement)
.collect();
assert_eq!(
cross_edges.len(),
2,
"expected one cross-statement edge per consumer, got {cross_edges:#?}"
);
let pairs: std::collections::HashSet<(usize, usize)> = cross_edges
.iter()
.map(|e| {
assert_eq!(
e.statement_ids.len(),
2,
"cross-statement edge should carry [producer, consumer]"
);
(e.statement_ids[0], e.statement_ids[1])
})
.collect();
assert!(
pairs.contains(&(0, 1)),
"missing producer=0 consumer=1 pair"
);
assert!(
pairs.contains(&(0, 2)),
"missing producer=0 consumer=2 pair"
);
let ids: std::collections::HashSet<_> = cross_edges.iter().map(|e| &e.id).collect();
assert_eq!(ids.len(), cross_edges.len(), "edge IDs must remain unique");
}
#[test]
fn recursive_ctes_produce_lineage_without_warnings() {
let sql = r#"
WITH RECURSIVE org_hierarchy AS (
SELECT e.employee_id, e.manager_id, 0 AS depth
FROM employees e
WHERE e.manager_id IS NULL
UNION ALL
SELECT child.employee_id, child.manager_id, parent.depth + 1
FROM employees child
JOIN org_hierarchy parent
ON child.manager_id = parent.employee_id
)
SELECT * FROM org_hierarchy;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
assert_eq!(result.summary.statement_count, 1);
let tables = collect_table_names(&result);
assert!(
tables.contains("employees"),
"recursive CTE should still record base table lineage"
);
assert!(
result
.issues
.iter()
.all(|issue| issue.severity != Severity::Warning),
"recursive CTEs should not emit warnings when supported"
);
let stmt = first_statement(&result);
let cte_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Cte && n.qualified_name.as_deref() == Some("org_hierarchy")
})
.collect();
assert!(
!cte_nodes.is_empty(),
"recursive CTE should produce at least one CTE node"
);
let employees_node = stmt
.nodes
.iter()
.find(|n| n.node_type == NodeType::Table && &*n.label == "employees")
.expect("base table should be present");
let cte_ids: HashSet<_> = cte_nodes.iter().map(|n| n.id.clone()).collect();
let has_recursive_edge = stmt
.edges
.iter()
.any(|e| cte_ids.contains(&e.from) && cte_ids.contains(&e.to));
assert!(
has_recursive_edge,
"recursive CTE should have an edge connecting recursive CTE instances"
);
let has_base_edge = stmt
.edges
.iter()
.any(|e| e.from == employees_node.id && cte_ids.contains(&e.to));
assert!(
has_base_edge,
"recursive CTE anchor should link base table to the CTE node"
);
}
#[test]
fn derived_tables_and_exists_predicates_produce_complete_lineage() {
let sql = r#"
WITH vip_flags AS (
SELECT DISTINCT user_id
FROM vip_users
)
SELECT agg.user_id,
agg.total_amount,
lp.payment_method
FROM (
SELECT o.user_id,
SUM(o.amount) AS total_amount
FROM orders o
JOIN payments p ON p.order_id = o.id
WHERE o.status = 'completed'
GROUP BY o.user_id
) AS agg
JOIN (
SELECT DISTINCT user_id,
MAX(method) AS payment_method
FROM payments
GROUP BY user_id
) AS lp
ON agg.user_id = lp.user_id
WHERE EXISTS (
SELECT 1
FROM vip_flags vf
WHERE vf.user_id = agg.user_id
);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in ["orders", "payments", "vip_users"] {
assert!(
tables.contains(expected),
"missing lineage for derived-table source {expected}; saw {tables:?}"
);
}
assert!(
result.summary.table_count >= 3,
"expected at least three physical tables"
);
assert!(
result.statements.first().is_some_and(|stmt| result
.edges_in_statement(stmt.statement_index)
.next()
.is_some()),
"expected data-flow edges connecting derived tables"
);
}
#[test]
fn schema_metadata_and_search_path_resolve_identifiers() {
let sql = r#"
WITH filtered_orders AS (
SELECT fo.order_id,
fo.customer_id,
fo.total_amount
FROM fact_orders fo
WHERE fo.region = 'us-east'
)
SELECT fo.order_id,
d.region,
d.loyalty_score
FROM filtered_orders fo
JOIN dim_customers d
ON fo.customer_id = d.customer_id;
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: Some("analytics".into()),
default_schema: Some("marts".into()),
search_path: Some(vec![
SchemaNamespaceHint {
catalog: Some("analytics".into()),
schema: "marts".into(),
},
SchemaNamespaceHint {
catalog: Some("analytics".into()),
schema: "core".into(),
},
]),
case_sensitivity: None,
tables: vec![
schema_table(
Some("analytics"),
Some("marts"),
"fact_orders",
&["order_id", "customer_id", "total_amount", "region"],
),
schema_table(
Some("analytics"),
Some("core"),
"dim_customers",
&["customer_id", "region"],
),
],
};
let result = run_analysis(sql, Dialect::Postgres, Some(schema));
let tables = collect_table_names(&result);
for expected in [
"analytics.marts.fact_orders",
"analytics.core.dim_customers",
] {
assert!(
tables.contains(expected),
"search_path should resolve {expected}"
);
}
assert!(
result
.issues
.iter()
.any(|issue| issue.code == issue_codes::UNKNOWN_COLUMN),
"missing loyalty_score should raise UNKNOWN_COLUMN"
);
assert!(
!result.summary.has_errors,
"validation warnings should not flip has_errors"
);
}
#[test]
fn set_operations_track_all_source_tables() {
let sql = r#"
WITH combined AS (
SELECT order_id, 'pending' AS source
FROM pending_orders
UNION ALL
SELECT shipment_id AS order_id, 'shipment' AS source
FROM pending_shipments
),
filtered AS (
SELECT order_id FROM combined
EXCEPT
SELECT order_id FROM quarantined_orders
)
SELECT order_id FROM filtered
UNION
SELECT legacy_id FROM legacy_orders;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert_eq!(
result.summary.statement_count, 1,
"entire set operation should be a single statement"
);
let tables = collect_table_names(&result);
for expected in [
"pending_orders",
"pending_shipments",
"quarantined_orders",
"legacy_orders",
] {
assert!(
tables.contains(expected),
"set operation should track {expected}"
);
}
assert!(
!result.summary.has_errors,
"set operations fixture should succeed without errors"
);
}
#[test]
fn ansi_select_registers_single_table_and_columns() {
let sql = r#"
SELECT u.id, u.email
FROM analytics.users u
WHERE u.is_active = TRUE;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert_eq!(result.summary.statement_count, 1);
let tables = collect_table_names(&result);
assert!(
tables.contains("analytics.users"),
"expected analytics.users in lineage, tables: {tables:?}"
);
let cols = column_labels(&first_statement(&result));
assert!(
cols.iter().any(|c| c == "id"),
"expected id column in output: {cols:?}"
);
assert!(
cols.iter().any(|c| c == "email"),
"expected email column in output: {cols:?}"
);
assert!(
!result.summary.has_errors,
"unexpected errors: {:?}",
result.issues
);
}
#[test]
fn ansi_join_variants_capture_all_tables() {
let sql = r#"
SELECT fs.order_id,
dc.customer_name,
ds.store_name,
r.region_name,
c.currency_code
FROM fact_sales fs
LEFT JOIN dim_customers dc ON dc.customer_id = fs.customer_id
RIGHT JOIN dim_stores ds ON ds.store_id = fs.store_id
FULL JOIN dim_regions r ON r.region_id = ds.region_id
CROSS JOIN dim_currency c;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in [
"fact_sales",
"dim_customers",
"dim_stores",
"dim_regions",
"dim_currency",
] {
assert!(tables.contains(expected), "missing join source {expected}");
}
assert!(
!result.summary.has_errors,
"join query produced errors: {:?}",
result.issues
);
}
#[test]
fn ansi_nested_ctes_register_each_virtual_table() {
let sql = r#"
WITH base_orders AS (
SELECT order_id, customer_id, total
FROM orders
),
ranked_orders AS (
SELECT order_id,
customer_id,
total,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY total DESC) AS rn
FROM base_orders
),
final_orders AS (
SELECT * FROM ranked_orders WHERE rn <= 5
)
SELECT * FROM final_orders;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let ctes = collect_cte_names(&result);
for expected in ["base_orders", "ranked_orders", "final_orders"] {
assert!(ctes.contains(expected), "missing CTE node {expected}");
}
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"physical base table should still be tracked: {tables:?}"
);
}
#[test]
fn ansi_reused_cte_is_deduplicated() {
let sql = r#"
WITH region_totals AS (
SELECT region, SUM(amount) AS total_amount
FROM orders
GROUP BY region
)
SELECT *
FROM region_totals rt
JOIN region_totals rt2 ON rt.region = rt2.region;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let ctes = collect_cte_names(&result);
assert_eq!(
ctes.len(),
1,
"region_totals should appear once even if referenced twice"
);
assert!(ctes.contains("region_totals"));
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"expected base table for reused CTE: {tables:?}"
);
}
#[test]
fn ansi_multi_statement_flow_updates_summary_and_cross_edges() {
let sql = r#"
SELECT id, email FROM users;
INSERT INTO daily_active_users (user_id)
SELECT id FROM users;
CREATE TABLE user_copy AS
SELECT id, email FROM users;
SELECT COUNT(*) FROM user_copy;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert_eq!(result.summary.statement_count, 4);
let cross_edges: Vec<_> = result
.edges
.iter()
.filter(|edge| edge.edge_type == EdgeType::CrossStatement)
.collect();
assert!(
!cross_edges.is_empty(),
"expected at least one cross-statement edge for user_copy consumption"
);
}
#[test]
fn ansi_insert_select_with_schema_flags_unknown_column() {
let sql = r#"
INSERT INTO analytics.daily_summary (order_id, amount, discount)
SELECT order_id, amount, discount
FROM analytics.orders;
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![schema_table(
None,
None,
"analytics.orders",
&["order_id", "amount"],
)],
};
let result = run_analysis(sql, Dialect::Generic, Some(schema));
let issues = issue_codes_list(&result);
assert!(
issues.contains(&issue_codes::UNKNOWN_COLUMN.to_string()),
"expected UNKNOWN_COLUMN for missing discount, issues: {:?}",
result.issues
);
}
#[test]
fn ansi_create_table_as_union_tracks_targets_and_sources() {
let sql = r#"
CREATE TABLE analytics.daily_rollup AS
SELECT order_id FROM analytics.orders
UNION ALL
SELECT shipment_id FROM analytics.shipments;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in [
"analytics.daily_rollup",
"analytics.orders",
"analytics.shipments",
] {
assert!(
tables.contains(expected),
"missing CTAS participant {expected}, tables: {tables:?}"
);
}
}
#[test]
fn ansi_star_without_schema_emits_approximate_lineage() {
let sql = r#"
SELECT * FROM analytics.events;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let issues = issue_codes_list(&result);
assert!(
issues.contains(&issue_codes::APPROXIMATE_LINEAGE.to_string()),
"expected APPROXIMATE_LINEAGE info for SELECT * without schema"
);
}
#[test]
fn ansi_star_with_schema_expands_columns() {
let sql = r#"
SELECT * FROM analytics.events;
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![schema_table(
None,
None,
"analytics.events",
&["user_id", "event_type", "event_time"],
)],
};
let result = run_analysis(sql, Dialect::Generic, Some(schema));
let issues = issue_codes_list(&result);
assert!(
!issues
.iter()
.any(|code| code == issue_codes::APPROXIMATE_LINEAGE),
"schema metadata should prevent approximate warnings: {:?}",
result.issues
);
assert!(
result.summary.column_count >= 3,
"column count should include expanded columns: {:?}",
result.summary
);
}
#[test]
fn ansi_window_functions_produce_derivation_edges() {
let sql = r#"
SELECT
o.user_id,
SUM(o.amount) OVER (PARTITION BY o.user_id ORDER BY o.created_at) AS rolling_total
FROM orders o;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivations.iter().any(|edge| {
edge.expression
.as_deref()
.map(|expr| expr.contains("OVER"))
.unwrap_or(false)
}),
"expected derivation edge capturing window expression: {:?}",
derivations
);
}
#[test]
fn ansi_values_clause_requires_no_tables() {
let sql = r#"
SELECT * FROM (VALUES (1, 'a'), (2, 'b')) AS v(id, label);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.is_empty(),
"VALUES clause should not emit table nodes: {tables:?}"
);
}
#[test]
fn ansi_table_function_emits_info_issue() {
let sql = r#"
SELECT *
FROM TABLE(generate_series(1, 3)) AS g(n);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let issues = issue_codes_list(&result);
assert!(
issues.contains(&issue_codes::UNSUPPORTED_SYNTAX.to_string()),
"table function should emit UNSUPPORTED_SYNTAX info"
);
}
#[test]
fn ansi_unnest_clause_keeps_base_table_lineage() {
let sql = r#"
SELECT item
FROM orders o
CROSS JOIN UNNEST(o.items) AS t(item);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"base table should still be tracked when UNNEST is used"
);
assert!(
!result.summary.has_errors,
"UNNEST support should not raise errors: {:?}",
result.issues
);
}
#[test]
fn ansi_pivot_usage_emits_warning() {
let sql = r#"
SELECT *
FROM (
SELECT region, month, revenue
FROM sales
) src
PIVOT (
SUM(revenue) FOR month IN ('jan', 'feb')
) AS p;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let issues = issue_codes_list(&result);
assert!(
issues.contains(&issue_codes::UNSUPPORTED_SYNTAX.to_string()),
"PIVOT should emit UNSUPPORTED_SYNTAX warning"
);
}
#[test]
fn ansi_cross_apply_tracks_lateral_sources() {
let sql = r#"
SELECT u.id, purchases.total
FROM users u
CROSS APPLY (
SELECT SUM(amount) AS total
FROM orders o
WHERE o.user_id = u.id
) purchases;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in ["users", "orders"] {
assert!(
tables.contains(expected),
"CROSS APPLY should capture {expected}"
);
}
}
#[test]
fn ansi_cte_shadowing_existing_table_prefers_cte() {
let sql = r#"
WITH daily_metrics AS (
SELECT *
FROM analytics.daily_metrics
WHERE metric_date >= CURRENT_DATE - 7
)
SELECT * FROM daily_metrics;
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![schema_table(
None,
None,
"analytics.daily_metrics",
&["metric_date", "active_users"],
)],
};
let result = run_analysis(sql, Dialect::Generic, Some(schema));
let tables = collect_table_names(&result);
assert!(
tables.contains("analytics.daily_metrics"),
"base table should be registered from inside the CTE"
);
let ctes = collect_cte_names(&result);
assert!(
ctes.contains("daily_metrics"),
"shadowing CTE should still appear as virtual node"
);
}
#[test]
fn ansi_scalar_subquery_introduces_additional_table() {
let sql = r#"
WITH max_orders AS (
SELECT user_id, MAX(amount) AS max_amount
FROM orders
GROUP BY user_id
)
SELECT u.id,
(SELECT max_amount
FROM max_orders mo
WHERE mo.user_id = u.id) AS max_amount
FROM users u;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in ["users", "orders"] {
assert!(
tables.contains(expected),
"scalar subquery should include {expected}"
);
}
}
#[test]
fn ansi_correlated_predicates_capture_all_sources() {
let sql = r#"
WITH order_lookup AS (
SELECT DISTINCT user_id FROM orders
),
flagged_users AS (
SELECT DISTINCT user_id FROM fraud_flags
)
SELECT u.id
FROM users u
WHERE EXISTS (
SELECT 1 FROM order_lookup o WHERE o.user_id = u.id
)
AND u.id IN (
SELECT f.user_id FROM flagged_users f
);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in ["users", "orders", "fraud_flags"] {
assert!(
tables.contains(expected),
"correlated predicates should capture {expected}"
);
}
}
#[test]
fn ansi_group_by_and_having_keep_single_table_reference() {
let sql = r#"
SELECT customer_id, COUNT(*) AS total_orders
FROM orders
GROUP BY customer_id
HAVING COUNT(*) > 5;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert_eq!(
tables.len(),
1,
"GROUP BY/HAVING should not duplicate table entries: {tables:?}"
);
assert!(
tables.contains("orders"),
"orders table should be present in lineage"
);
}
#[test]
fn ansi_case_expressions_emit_derivation_edges() {
let sql = r#"
SELECT
CASE
WHEN amount > 100 THEN 'big'
ELSE 'small'
END AS spend_bucket
FROM orders;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
!derivations.is_empty(),
"CASE expression should create derivation edges"
);
}
#[test]
fn dml_update_with_from_clause_tracks_source_tables() {
let sql = r#"
UPDATE analytics.target t
SET t.status = s.new_status,
t.updated_at = s.timestamp
FROM analytics.staging s
WHERE t.id = s.id;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("analytics.target"),
"UPDATE target should be tracked"
);
assert!(
tables.contains("analytics.staging"),
"UPDATE source (FROM) should be tracked"
);
}
#[test]
fn dml_update_with_subquery_captures_lineage() {
let sql = r#"
UPDATE users
SET tier = (
SELECT CASE
WHEN SUM(amount) > 10000 THEN 'platinum'
WHEN SUM(amount) > 1000 THEN 'gold'
ELSE 'silver'
END
FROM orders
WHERE orders.user_id = users.id
);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(tables.contains("users"), "UPDATE target should be tracked");
assert!(
tables.contains("orders"),
"UPDATE subquery source should be tracked"
);
}
#[test]
fn dml_delete_with_subquery_identifies_dependencies() {
let sql = r#"
DELETE FROM orders
WHERE user_id IN (
SELECT id FROM deleted_users
);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(tables.contains("orders"), "DELETE target should be tracked");
assert!(
tables.contains("deleted_users"),
"DELETE subquery source should be tracked"
);
}
#[test]
fn dml_delete_with_join_tracks_all_tables() {
let sql = r#"
DELETE FROM orders AS o
USING cancelled_subscriptions AS c
WHERE o.subscription_id = c.id
AND c.cancelled_date < CURRENT_DATE - 30;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"DELETE target alias should resolve to table"
);
assert!(
tables.contains("cancelled_subscriptions"),
"DELETE JOIN source should be tracked"
);
}
#[test]
fn dml_merge_statement_tracks_target_and_source() {
let sql = r#"
MERGE INTO analytics.customer_metrics t
USING analytics.daily_activity s
ON t.customer_id = s.customer_id AND t.date = s.date
WHEN MATCHED THEN
UPDATE SET t.activity_score = s.score, t.updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (customer_id, date, activity_score, created_at)
VALUES (s.customer_id, s.date, s.score, CURRENT_TIMESTAMP);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("analytics.customer_metrics"),
"MERGE target should be tracked"
);
assert!(
tables.contains("analytics.daily_activity"),
"MERGE source should be tracked"
);
}
#[test]
fn dml_merge_with_complex_source_query() {
let sql = r#"
MERGE INTO target t
USING (
SELECT s.id,
s.value,
d.metadata
FROM source s
JOIN dimensions d ON s.dim_id = d.id
WHERE s.active = true
) src
ON t.id = src.id
WHEN MATCHED THEN UPDATE SET t.value = src.value
WHEN NOT MATCHED THEN INSERT (id, value) VALUES (src.id, src.value);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(tables.contains("target"), "MERGE target should be tracked");
assert!(
tables.contains("source"),
"MERGE subquery source 1 should be tracked"
);
assert!(
tables.contains("dimensions"),
"MERGE subquery source 2 should be tracked"
);
}
#[test]
fn column_lineage_using_clause_tracks_implicit_columns() {
let sql = r#"
SELECT t1.id, t1.name, t2.amount
FROM orders t1
JOIN payments t2 USING (order_id, customer_id);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in ["orders", "payments"] {
assert!(
tables.contains(expected),
"JOIN USING should track {expected}"
);
}
let stmt = first_statement(&result);
assert!(
!stmt.edges.is_empty(),
"JOIN USING should create column-level edges"
);
}
#[test]
fn column_lineage_natural_join_captures_tables() {
let sql = r#"
SELECT o.order_id, c.customer_name
FROM orders o
NATURAL JOIN customers c;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in ["orders", "customers"] {
assert!(
tables.contains(expected),
"NATURAL JOIN should track {expected}"
);
}
}
#[test]
fn column_lineage_multiple_aliases_to_same_column() {
let sql = r#"
SELECT id AS user_id,
id AS customer_id,
id AS account_id
FROM users;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let cols = column_labels(&first_statement(&result));
for expected in ["user_id", "customer_id", "account_id"] {
assert!(
cols.contains(&expected.to_string()),
"multiple aliases should create distinct column nodes: {expected}"
);
}
}
#[test]
fn column_lineage_renaming_chain_through_ctes() {
let sql = r#"
WITH stage1 AS (
SELECT user_id AS uid FROM orders
),
stage2 AS (
SELECT uid AS customer_id FROM stage1
),
stage3 AS (
SELECT customer_id AS cid FROM stage2
)
SELECT cid AS final_id FROM stage3;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"column renaming chain should preserve base table lineage"
);
let ctes = collect_cte_names(&result);
assert_eq!(
ctes.len(),
3,
"all intermediate CTEs in renaming chain should be tracked"
);
}
#[test]
fn column_lineage_coalesce_across_multiple_tables() {
let sql = r#"
SELECT
COALESCE(t1.email, t2.email, t3.email, 'unknown@example.com') AS email,
COALESCE(t1.phone, t2.phone) AS phone
FROM users t1
LEFT JOIN user_profiles t2 ON t1.id = t2.user_id
LEFT JOIN user_contacts t3 ON t1.id = t3.user_id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in ["users", "user_profiles", "user_contacts"] {
assert!(
tables.contains(expected),
"COALESCE should track all source tables: {expected}"
);
}
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
!derivations.is_empty(),
"COALESCE should create derivation edges"
);
}
#[test]
fn column_lineage_concat_and_string_functions() {
let sql = r#"
SELECT
CONCAT(first_name, ' ', last_name) AS full_name,
UPPER(email) AS email_upper,
SUBSTRING(phone, 1, 3) AS area_code
FROM users;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivations.len() >= 3,
"string functions should create derivation edges for each computed column"
);
}
#[test]
fn advanced_agg_grouping_sets_tracks_source() {
let sql = r#"
SELECT region, product, SUM(sales) AS total_sales
FROM orders
GROUP BY GROUPING SETS ((region), (product), (region, product), ());
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"GROUPING SETS should track source table"
);
}
#[test]
fn advanced_agg_cube_preserves_lineage() {
let sql = r#"
SELECT region, product, quarter, SUM(revenue) AS total_revenue
FROM sales
GROUP BY CUBE (region, product, quarter);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("sales"),
"CUBE aggregation should track source table"
);
}
#[test]
fn advanced_agg_rollup_with_having() {
let sql = r#"
SELECT region, SUM(amount) AS total
FROM orders
GROUP BY ROLLUP (region)
HAVING SUM(amount) > 1000;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"ROLLUP with HAVING should track source"
);
}
#[test]
fn advanced_agg_filter_clause_on_aggregates() {
let sql = r#"
SELECT
user_id,
COUNT(*) FILTER (WHERE status = 'active') AS active_count,
COUNT(*) FILTER (WHERE status = 'inactive') AS inactive_count,
SUM(amount) FILTER (WHERE category = 'premium') AS premium_total
FROM orders
GROUP BY user_id;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"aggregate FILTER clause should track source table"
);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
!derivations.is_empty(),
"FILTER aggregates should create derivation edges"
);
}
#[test]
fn advanced_agg_nested_aggregations() {
let sql = r#"
SELECT region, AVG(product_total) AS avg_per_product
FROM (
SELECT region, product, SUM(amount) AS product_total
FROM sales
GROUP BY region, product
) AS subq
GROUP BY region;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("sales"),
"nested aggregations should track original source"
);
}
#[test]
fn advanced_agg_array_agg_and_string_agg() {
let sql = r#"
SELECT
user_id,
ARRAY_AGG(product_id ORDER BY purchase_date) AS purchased_products,
STRING_AGG(product_name, ', ') AS product_list
FROM purchases
GROUP BY user_id;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("purchases"),
"ARRAY_AGG/STRING_AGG should track source"
);
}
#[test]
fn self_join_multi_level_hierarchy() {
let sql = r#"
SELECT
e1.name AS employee,
e2.name AS manager,
e3.name AS director,
e4.name AS vp
FROM employees e1
LEFT JOIN employees e2 ON e1.manager_id = e2.id
LEFT JOIN employees e3 ON e2.manager_id = e3.id
LEFT JOIN employees e4 ON e3.manager_id = e4.id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(
table_nodes.len(),
4,
"self-join with 4 aliases should produce 4 distinct table nodes, got: {:?}",
table_nodes.iter().map(|n| &n.id).collect::<Vec<_>>()
);
for node in &table_nodes {
assert_eq!(
node.qualified_name.as_deref(),
Some("employees"),
"all self-join nodes should have canonical qualified_name"
);
}
let unique_ids: HashSet<_> = table_nodes.iter().map(|n| &n.id).collect();
assert_eq!(
unique_ids.len(),
4,
"each alias should have a unique node ID"
);
let cols = column_labels(&stmt);
for expected in ["employee", "manager", "director", "vp"] {
assert!(
cols.contains(&expected.to_string()),
"multi-level self-join should track all output columns: {expected}"
);
}
}
#[test]
fn self_join_with_aggregation() {
let sql = r#"
SELECT
e1.department_id,
COUNT(DISTINCT e1.id) AS employee_count,
COUNT(DISTINCT e2.id) AS manager_count
FROM employees e1
LEFT JOIN employees e2 ON e1.id = e2.manager_id
GROUP BY e1.department_id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(
table_nodes.len(),
2,
"self-join with 2 aliases should produce 2 distinct table nodes"
);
for node in &table_nodes {
assert_eq!(node.qualified_name.as_deref(), Some("employees"),);
}
}
#[test]
fn self_join_filters_attach_to_correct_instance() {
let sql = r#"
SELECT e1.name, e2.name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
WHERE e1.active = true
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(table_nodes.len(), 2, "self-join should have 2 table nodes");
let nodes_with_filters: Vec<_> = table_nodes
.iter()
.filter(|n| !n.filters.is_empty())
.collect();
assert_eq!(
nodes_with_filters.len(),
1,
"only the e1 instance should have the filter, got filters on {} nodes",
nodes_with_filters.len()
);
let filtered_node = nodes_with_filters[0];
assert_eq!(filtered_node.filters.len(), 1);
assert!(
filtered_node.filters[0].expression.contains("active"),
"filter should reference 'active'"
);
}
#[test]
fn shared_nodes_preserve_filters_per_statement_in_metadata() {
let sql = r#"
SELECT id FROM users WHERE active = true;
SELECT id FROM users WHERE deleted = false
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let users = result
.nodes
.iter()
.find(|node| node.node_type == NodeType::Table && node.label.as_ref() == "users")
.expect("shared users table node should exist");
let statement_filters = users
.metadata
.as_ref()
.and_then(|metadata| metadata.get("statementFilters"))
.and_then(|value| value.as_object())
.expect("statementFilters metadata should be present");
let first_filters = statement_filters
.get("0")
.and_then(|value| value.as_array())
.expect("statement 0 filters should be recorded");
let second_filters = statement_filters
.get("1")
.and_then(|value| value.as_array())
.expect("statement 1 filters should be recorded");
assert_eq!(first_filters.len(), 1);
assert_eq!(second_filters.len(), 1);
assert!(first_filters[0]["expression"]
.as_str()
.unwrap()
.contains("active"));
assert!(second_filters[0]["expression"]
.as_str()
.unwrap()
.contains("deleted"));
}
#[test]
fn self_join_column_ownership_is_instance_aware() {
let sql = r#"
SELECT e1.name AS emp_name, e2.name AS mgr_name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(table_nodes.len(), 2);
for table_node in &table_nodes {
let owned_columns: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::Ownership && e.from == table_node.id)
.collect();
assert!(
!owned_columns.is_empty(),
"each self-join instance should own at least one column, but node {} has none",
table_node.id
);
}
let owned_by_first: HashSet<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::Ownership && e.from == table_nodes[0].id)
.map(|e| &e.to)
.collect();
let owned_by_second: HashSet<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::Ownership && e.from == table_nodes[1].id)
.map(|e| &e.to)
.collect();
assert!(
owned_by_first.is_disjoint(&owned_by_second),
"self-join instances should own disjoint column sets"
);
}
#[test]
fn self_join_wildcard_expands_all_relation_instances() {
let sql = r#"
SELECT *
FROM public.employees e1
JOIN public.employees e2 ON e1.manager_id = e2.id
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![schema_table(
None,
Some("public"),
"employees",
&["id", "manager_id", "name"],
)],
};
let result = run_analysis(sql, Dialect::Postgres, Some(schema));
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|node| {
node.node_type == NodeType::Table
&& node.qualified_name.as_deref() == Some("public.employees")
})
.collect();
assert_eq!(
table_nodes.len(),
2,
"self-join should create 2 table nodes"
);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("output node should exist");
for column_name in ["id", "manager_id", "name"] {
let output_column_id = stmt
.edges
.iter()
.find(|edge| {
edge.edge_type == EdgeType::Ownership
&& edge.from == output_node.id
&& stmt.nodes.iter().any(|node| {
node.id == edge.to
&& node.node_type == NodeType::Column
&& &*node.label == column_name
})
})
.map(|edge| edge.to.clone())
.unwrap_or_else(|| panic!("output column {column_name} should exist"));
let source_owner_ids: HashSet<_> = stmt
.edges
.iter()
.filter(|edge| edge.edge_type == EdgeType::DataFlow && edge.to == output_column_id)
.filter_map(|edge| {
stmt.edges
.iter()
.find(|ownership| {
ownership.edge_type == EdgeType::Ownership && ownership.to == edge.from
})
.map(|ownership| ownership.from.clone())
})
.collect();
assert_eq!(
source_owner_ids.len(),
2,
"wildcard-expanded column {column_name} should receive lineage from both self-join instances"
);
}
}
#[test]
fn schema_qualified_unaliased_self_join_reference_uses_distinct_instance() {
let sql = r#"
SELECT public.employees.id AS employee_id, e2.id AS manager_id
FROM public.employees
JOIN public.employees e2 ON public.employees.manager_id = e2.id
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![schema_table(
None,
Some("public"),
"employees",
&["id", "manager_id", "name"],
)],
};
let result = run_analysis(sql, Dialect::Postgres, Some(schema));
let stmt = first_statement(&result);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("output node should exist");
let output_column_id = |label: &str| {
stmt.edges
.iter()
.find(|edge| {
edge.edge_type == EdgeType::Ownership
&& edge.from == output_node.id
&& stmt.nodes.iter().any(|node| {
node.id == edge.to
&& node.node_type == NodeType::Column
&& &*node.label == label
})
})
.map(|edge| edge.to.clone())
.unwrap_or_else(|| panic!("{label} should exist as an output column"))
};
let source_owner_id = |output_column_id: &std::sync::Arc<str>| {
let source_column_id = stmt
.edges
.iter()
.find(|edge| edge.edge_type == EdgeType::DataFlow && edge.to == *output_column_id)
.map(|edge| edge.from.clone())
.expect("output column should have a source column");
stmt.edges
.iter()
.find(|edge| edge.edge_type == EdgeType::Ownership && edge.to == source_column_id)
.map(|edge| edge.from.clone())
.expect("source column should be owned by a table")
};
let employee_owner_id = source_owner_id(&output_column_id("employee_id"));
let manager_owner_id = source_owner_id(&output_column_id("manager_id"));
assert_ne!(
employee_owner_id, manager_owner_id,
"schema-qualified reference to the unaliased side should not collapse onto the aliased self-join node"
);
}
#[test]
fn self_join_preserves_distinct_instances_in_flat_lineage() {
let sql = r#"
SELECT e1.name, e2.name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let stmt_table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(stmt_table_nodes.len(), 2);
let employees_nodes: Vec<_> = result
.nodes
.iter()
.filter(|n| n.canonical_name.as_ref().unwrap().name == "employees")
.collect();
assert_eq!(
employees_nodes.len(),
2,
"flat lineage should preserve both self-join instances of employees"
);
let global_node_ids: HashSet<_> = result.nodes.iter().map(|n| n.id.clone()).collect();
for edge in &result.edges {
assert!(
global_node_ids.contains(&edge.from),
"edge {} has missing source node {}",
edge.id,
edge.from
);
assert!(
global_node_ids.contains(&edge.to),
"edge {} has missing target node {}",
edge.id,
edge.to
);
}
}
#[test]
fn self_join_keeps_per_instance_source_columns_in_flat_lineage() {
let sql = r#"
SELECT
e1.name AS employee_name,
e2.name AS manager_name,
e3.name AS director_name
FROM employees e1
LEFT JOIN employees e2 ON e1.manager_id = e2.id
LEFT JOIN employees e3 ON e2.manager_id = e3.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let employees_nodes: Vec<_> = result
.nodes
.iter()
.filter(|node| {
node.node_type == NodeType::Table
&& node.canonical_name.as_ref().unwrap().name == "employees"
})
.collect();
assert_eq!(employees_nodes.len(), 3, "one node per self-join instance");
let source_name_nodes: Vec<_> = result
.nodes
.iter()
.filter(|node| {
node.node_type == NodeType::Column
&& node.canonical_name.as_ref().unwrap().schema.as_deref() == Some("employees")
&& node.canonical_name.as_ref().unwrap().name == "name"
})
.collect();
assert_eq!(
source_name_nodes.len(),
3,
"each self-join instance owns its own `name` source column"
);
for employees_node in &employees_nodes {
let owned_name_columns: Vec<_> = result
.edges
.iter()
.filter(|edge| {
edge.edge_type == EdgeType::Ownership
&& edge.from == employees_node.id
&& source_name_nodes.iter().any(|n| n.id == edge.to)
})
.collect();
assert_eq!(
owned_name_columns.len(),
1,
"each instance owns exactly one `name` source column"
);
}
let output_targets: HashSet<_> = result
.edges
.iter()
.filter(|edge| {
edge.edge_type == EdgeType::DataFlow
&& source_name_nodes.iter().any(|n| n.id == edge.from)
})
.map(|edge| edge.to.clone())
.collect();
assert_eq!(
output_targets.len(),
3,
"each instance's source column feeds its own output alias"
);
}
#[test]
fn global_lineage_merges_qualified_columns_across_self_joins_and_cte_instances() {
let sql = r#"
SELECT
e1.name AS employee_name,
e2.name AS manager_name,
e3.name AS director_name
FROM employees e1
LEFT JOIN employees e2 ON e1.manager_id = e2.id
LEFT JOIN employees e3 ON e2.manager_id = e3.id
WHERE e1.active = true AND e3.region = 'NA';
WITH org AS (
SELECT
id AS employee_id,
manager_id,
department_id
FROM employees
)
SELECT
a.employee_id,
b.employee_id AS manager_employee_id
FROM org a
JOIN org b ON a.manager_id = b.employee_id
WHERE a.department_id = 10;
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![schema_table(
None,
None,
"employees",
&[
"id",
"manager_id",
"department_id",
"name",
"active",
"region",
],
)],
};
let result = run_analysis(sql, Dialect::Generic, Some(schema));
let employees_name_nodes: Vec<_> = result
.nodes
.iter()
.filter(|node| {
node.node_type == NodeType::Column
&& node.canonical_name.as_ref().unwrap().schema.as_deref() == Some("employees")
&& node.canonical_name.as_ref().unwrap().name == "name"
})
.collect();
assert_eq!(
employees_name_nodes.len(),
3,
"each self-join instance contributes its own employees.name node"
);
let org_employee_id_nodes: Vec<_> = result
.nodes
.iter()
.filter(|node| {
node.node_type == NodeType::Column
&& node.canonical_name.as_ref().unwrap().schema.as_deref() == Some("org")
&& node.canonical_name.as_ref().unwrap().name == "employee_id"
})
.collect();
assert_eq!(org_employee_id_nodes.len(), 1);
assert_eq!(org_employee_id_nodes[0].statement_ids, vec![1]);
}
#[test]
fn self_join_unqualified_column_reference_stays_ambiguous() {
let sql = r#"
SELECT id
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let ambiguity_issue = result.issues.iter().find(|issue| {
issue.code == issue_codes::UNRESOLVED_REFERENCE
&& issue.message.to_lowercase().contains("ambiguous")
});
assert!(
ambiguity_issue.is_some(),
"unqualified self-join columns should still be ambiguous"
);
}
#[test]
fn ambiguous_self_join_projection_does_not_emit_dangling_column_or_join_dependency() {
let sql = r#"
SELECT id
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
assert!(
result.issues.iter().any(|issue| {
issue.code == issue_codes::UNRESOLVED_REFERENCE
&& issue.message.to_lowercase().contains("ambiguous")
}),
"ambiguous self-join projection should still emit an ambiguity warning"
);
assert!(
stmt.nodes
.iter()
.filter(|node| node.node_type == NodeType::Column && &*node.label == "id")
.count()
== 0,
"ambiguous projected column should not create a dangling output column node"
);
assert!(
stmt.edges
.iter()
.all(|edge| edge.edge_type != EdgeType::JoinDependency),
"invalid ambiguous projection should not synthesize join-dependency edges"
);
}
#[test]
fn unresolved_bare_join_projection_remains_in_output_schema() {
let sql = r#"
SELECT name
FROM customers
JOIN orders ON customers.id = orders.customer_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
assert!(
result
.issues
.iter()
.any(|issue| issue.code == issue_codes::UNRESOLVED_REFERENCE),
"best-effort multi-table analysis should still surface the unresolved reference"
);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("Output node should exist");
let visible_output_column = stmt.nodes.iter().find(|node| {
node.node_type == NodeType::Column
&& &*node.label == "name"
&& stmt.edges.iter().any(|edge| {
edge.edge_type == EdgeType::Ownership
&& edge.from == output_node.id
&& edge.to == node.id
})
});
assert!(
visible_output_column.is_some(),
"unresolved bare projections should remain visible in the output schema"
);
}
#[test]
fn repeated_cte_aliases_create_distinct_reference_nodes() {
let sql = r#"
WITH org AS (
SELECT id, manager_id
FROM employees
)
SELECT a.id
FROM org a
JOIN org b ON a.manager_id = b.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let cte_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Cte && n.qualified_name.as_deref() == Some("org"))
.collect();
assert_eq!(
cte_nodes.len(),
2,
"expected CTE definition (reused by first alias) plus one self-join instance node"
);
let unique_ids: HashSet<_> = cte_nodes.iter().map(|n| n.id.clone()).collect();
assert_eq!(
unique_ids.len(),
2,
"CTE self-join aliases should have distinct node IDs"
);
}
#[test]
fn repeated_cte_aliases_across_statements_keep_distinct_global_instance_nodes() {
let sql = r#"
WITH org AS (
SELECT id, manager_id
FROM employees
)
SELECT a.id
FROM org a
JOIN org b ON a.manager_id = b.id;
WITH org AS (
SELECT id, parent_id AS manager_id
FROM departments
)
SELECT a.id
FROM org a
JOIN org b ON a.manager_id = b.id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let global_org_nodes: Vec<_> = result
.nodes
.iter()
.filter(|node| {
node.node_type == NodeType::Cte && node.canonical_name.as_ref().unwrap().name == "org"
})
.collect();
assert_eq!(
global_org_nodes.len(),
4,
"global lineage should keep statement-local org definitions plus one org b instance per statement"
);
let statement_scoped_instances = global_org_nodes
.iter()
.filter(|node| node.statement_ids.len() == 1)
.count();
assert_eq!(
statement_scoped_instances, 4,
"CTE definitions and self-join instances should remain statement-local in global lineage"
);
let global_org_columns: Vec<_> = result
.nodes
.iter()
.filter(|node| {
node.node_type == NodeType::Column
&& node.canonical_name.as_ref().unwrap().schema.as_deref() == Some("org")
&& matches!(
node.canonical_name.as_ref().unwrap().name.as_str(),
"id" | "manager_id"
)
})
.collect();
assert!(
global_org_columns.len() >= 4,
"global lineage should keep distinct org column nodes for each statement-local CTE scope"
);
let cross_statement_org_columns: Vec<_> = global_org_columns
.iter()
.filter(|node| {
node.statement_ids
.iter()
.copied()
.collect::<HashSet<_>>()
.len()
> 1
})
.collect();
assert!(
cross_statement_org_columns.is_empty(),
"CTE-owned org columns should not merge across statements in global lineage"
);
}
#[test]
fn cte_self_join_alias_columns_do_not_leak_across_union_scopes() {
let sql = r#"
WITH emp AS (
SELECT id, name
FROM employees
),
dept AS (
SELECT id, name
FROM departments
)
SELECT b.name AS emp_name
FROM emp a
JOIN emp b ON a.id = b.id
UNION ALL
SELECT b.name AS dept_name
FROM dept a
JOIN dept b ON a.id = b.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
assert!(
!result.summary.has_errors,
"CTE alias reuse across UNION branches should analyze cleanly: {:?}",
result.issues
);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("Output node should exist");
let dept_output = stmt
.nodes
.iter()
.find(|node| {
node.node_type == NodeType::Column
&& &*node.label == "dept_name"
&& stmt.edges.iter().any(|edge| {
edge.edge_type == EdgeType::Ownership
&& edge.from == output_node.id
&& edge.to == node.id
})
})
.expect("dept_name output column should exist");
let dept_source_ids: HashSet<_> = stmt
.edges
.iter()
.filter(|edge| edge.edge_type == EdgeType::DataFlow && edge.to == dept_output.id)
.map(|edge| edge.from.clone())
.collect();
assert!(
!dept_source_ids.is_empty(),
"dept_name should receive a direct data-flow edge from the dept branch source column"
);
let source_owner_labels: HashSet<_> = stmt
.edges
.iter()
.filter(|edge| edge.edge_type == EdgeType::Ownership && dept_source_ids.contains(&edge.to))
.filter_map(|edge| stmt.nodes.iter().find(|node| node.id == edge.from))
.map(|node| node.label.to_string())
.collect();
assert!(
source_owner_labels.contains("dept"),
"dept_name should be sourced from a dept CTE instance, saw owners {source_owner_labels:?}"
);
assert!(
!source_owner_labels.contains("emp"),
"dept_name should not reuse emp columns when aliases are reused across UNION branches, saw owners {source_owner_labels:?}"
);
}
#[test]
fn self_join_in_subquery_produces_distinct_nodes() {
let sql = r#"
SELECT sub.emp_name, sub.mgr_name
FROM (
SELECT e1.name AS emp_name, e2.name AS mgr_name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
) sub
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(
table_nodes.len(),
2,
"self-join inside subquery should produce 2 distinct table nodes, got: {:?}",
table_nodes.iter().map(|n| &n.id).collect::<Vec<_>>()
);
for node in &table_nodes {
assert_eq!(
node.qualified_name.as_deref(),
Some("employees"),
"subquery self-join nodes should have canonical qualified_name"
);
}
let global_employees: Vec<_> = result
.nodes
.iter()
.filter(|n| n.canonical_name.as_ref().unwrap().name == "employees")
.collect();
assert_eq!(
global_employees.len(),
2,
"flat lineage preserves both subquery self-join instances"
);
}
#[test]
fn nested_self_join_aliases_keep_filters_isolated_per_scope() {
let sql = r#"
SELECT e2.name, sub.inner_mgr
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
JOIN (
SELECT e2.id, e2.name AS inner_mgr
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
WHERE e2.department = 'sales'
) sub ON sub.id = e2.id
WHERE e2.department = 'eng'
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let filtered_employee_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Table
&& n.qualified_name.as_deref() == Some("employees")
&& !n.filters.is_empty()
})
.collect();
assert_eq!(
filtered_employee_nodes.len(),
2,
"outer and inner self-join aliases should keep separate filtered nodes"
);
let filter_sets: HashSet<Vec<String>> = filtered_employee_nodes
.iter()
.map(|node| {
let mut filters: Vec<String> =
node.filters.iter().map(|f| f.expression.clone()).collect();
filters.sort();
filters
})
.collect();
assert!(
filter_sets.contains(&vec!["e2.department = 'eng'".to_string()]),
"expected one filtered node for the outer e2 alias, got {filter_sets:?}"
);
assert!(
filter_sets.contains(&vec!["e2.department = 'sales'".to_string()]),
"expected one filtered node for the inner e2 alias, got {filter_sets:?}"
);
}
#[test]
fn self_join_alias_matching_another_table_name() {
let sql = r#"
SELECT c1.name, orders.total
FROM customers c1
JOIN customers orders ON c1.referrer_id = orders.id
JOIN orders real_orders ON orders.id = real_orders.customer_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let customer_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Table && n.qualified_name.as_deref() == Some("customers")
})
.collect();
assert_eq!(
customer_nodes.len(),
2,
"customers self-join should produce 2 distinct nodes"
);
let order_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table && n.qualified_name.as_deref() == Some("orders"))
.collect();
assert_eq!(
order_nodes.len(),
1,
"the real 'orders' table should have exactly 1 node"
);
}
#[test]
fn three_way_self_join_filters_isolated() {
let sql = r#"
SELECT e1.name, e2.name, e3.name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
JOIN employees e3 ON e2.manager_id = e3.id
WHERE e1.active = true AND e3.level = 'director'
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(
table_nodes.len(),
3,
"3-way self-join should produce 3 nodes"
);
let nodes_with_filters: Vec<_> = table_nodes
.iter()
.filter(|n| !n.filters.is_empty())
.collect();
assert_eq!(
nodes_with_filters.len(),
2,
"filters should attach to exactly 2 of the 3 self-join instances"
);
let nodes_without_filters: Vec<_> = table_nodes
.iter()
.filter(|n| n.filters.is_empty())
.collect();
assert_eq!(nodes_without_filters.len(), 1);
}
#[test]
fn self_join_unqualified_filter_applies_to_all_instances() {
let sql = r#"
SELECT e1.name, e2.name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
WHERE active = true
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(table_nodes.len(), 2, "self-join should produce 2 nodes");
for node in &table_nodes {
assert!(
!node.filters.is_empty(),
"unqualified filter should apply to all instances, but node {} has no filters",
node.id
);
}
}
#[test]
fn self_join_unqualified_filter_with_other_join_applies_to_all_matching_instances() {
let sql = r#"
SELECT e1.name, e2.name, d.dept_name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
JOIN departments d ON e1.dept_id = d.id
WHERE active = true
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![
schema_table(
None,
None,
"employees",
&["id", "manager_id", "dept_id", "name", "active"],
),
schema_table(None, None, "departments", &["id", "dept_name"]),
],
};
let result = run_analysis(sql, Dialect::Generic, Some(schema));
let stmt = first_statement(&result);
let employee_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Table && n.qualified_name.as_deref() == Some("employees")
})
.collect();
assert_eq!(
employee_nodes.len(),
2,
"self-join should produce 2 employee nodes"
);
for node in &employee_nodes {
assert!(
node.filters.iter().any(|f| f.expression.contains("active")),
"ambiguous self-join filter should apply to all employee instances, but node {} has {:?}",
node.id,
node.filters
);
}
let departments_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Table && n.qualified_name.as_deref() == Some("departments")
})
.collect();
assert_eq!(
departments_nodes.len(),
1,
"regular join should produce 1 departments node"
);
assert!(
departments_nodes[0].filters.is_empty(),
"employee-only ambiguous filter should not attach to departments: {:?}",
departments_nodes[0].filters
);
}
#[test]
fn self_join_mixed_with_regular_join() {
let sql = r#"
SELECT e1.name, e2.name, d.dept_name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
JOIN departments d ON e1.dept_id = d.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let emp_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Table && n.qualified_name.as_deref() == Some("employees")
})
.collect();
assert_eq!(
emp_nodes.len(),
2,
"self-join should produce 2 employee nodes"
);
let dept_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Table && n.qualified_name.as_deref() == Some("departments")
})
.collect();
assert_eq!(
dept_nodes.len(),
1,
"regular join should produce 1 department node"
);
let mut ids: Vec<_> = emp_nodes
.iter()
.chain(dept_nodes.iter())
.map(|n| &n.id)
.collect();
ids.sort();
ids.dedup();
assert_eq!(ids.len(), 3, "all node IDs should be distinct");
}
#[test]
fn cte_self_join_produces_distinct_nodes() {
let sql = r#"
WITH emp AS (
SELECT id, name, manager_id FROM employees
)
SELECT e1.name AS employee, e2.name AS manager
FROM emp e1
JOIN emp e2 ON e1.manager_id = e2.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let cte_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Cte)
.collect();
assert!(
!cte_nodes.is_empty(),
"CTE self-join should produce at least 1 CTE node"
);
let global_emp: Vec<_> = result
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Cte && n.canonical_name.as_ref().unwrap().name == "emp"
})
.collect();
assert!(
global_emp.len() >= 2,
"global lineage should preserve non-recursive CTE instances"
);
assert!(
result.edges.iter().all(|edge| {
!(edge.edge_type == EdgeType::DataFlow
&& edge.from == edge.to
&& global_emp.iter().any(|node| node.id == edge.from))
}),
"non-recursive CTE self-joins should not become global data-flow self-loops"
);
}
#[test]
fn self_join_mixed_qualified_and_unqualified_predicates() {
let sql = r#"
SELECT e1.name, e2.name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
WHERE e1.active = true AND department = 'sales'
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(table_nodes.len(), 2, "self-join should produce 2 nodes");
let max_filters = table_nodes.iter().map(|n| n.filters.len()).max().unwrap();
assert!(
max_filters >= 2,
"node with qualified + unqualified filters should have at least 2 filters"
);
for node in &table_nodes {
let has_dept_filter = node
.filters
.iter()
.any(|f| f.expression.contains("department"));
assert!(
has_dept_filter,
"unqualified filter should apply to all instances, but node {} is missing it",
node.id
);
}
}
#[test]
fn self_join_without_aliases_produces_single_node() {
let sql = r#"
SELECT name
FROM employees
JOIN employees ON employees.manager_id = employees.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(
table_nodes.len(),
1,
"unaliased self-join should collapse to a single node for backward compatibility"
);
assert_eq!(table_nodes[0].qualified_name.as_deref(), Some("employees"));
}
#[test]
fn triple_self_join_each_alias_gets_distinct_filter() {
let sql = r#"
SELECT e1.name, e2.name, e3.name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
JOIN employees e3 ON e2.manager_id = e3.id
WHERE e1.active = true AND e2.department = 'eng' AND e3.level = 'director'
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(
table_nodes.len(),
3,
"3-way self-join should produce 3 table nodes"
);
for node in &table_nodes {
assert_eq!(
node.filters.len(),
1,
"each self-join instance should have exactly 1 filter, but node {} has {}",
node.id,
node.filters.len()
);
}
let mut filter_texts: Vec<String> = table_nodes
.iter()
.flat_map(|n| n.filters.iter().map(|f| f.expression.clone()))
.collect();
filter_texts.sort();
assert!(
filter_texts.iter().any(|f| f.contains("active")),
"one filter should reference 'active'"
);
assert!(
filter_texts.iter().any(|f| f.contains("department")),
"one filter should reference 'department'"
);
assert!(
filter_texts.iter().any(|f| f.contains("level")),
"one filter should reference 'level'"
);
}
#[test]
fn self_join_alias_matches_canonical_name() {
let sql = r#"
SELECT e1.name, employees.name
FROM employees e1
JOIN employees employees ON e1.manager_id = employees.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(
table_nodes.len(),
1,
"self-join where alias matches canonical collapses to 1 node (known limitation)"
);
}
#[test]
fn cte_self_join_filters_attach_to_correct_instance() {
let sql = r#"
WITH org AS (
SELECT id, name, manager_id, department FROM employees
)
SELECT a.name, b.name
FROM org a
JOIN org b ON a.manager_id = b.id
WHERE a.department = 'eng' AND b.department = 'sales'
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let cte_ref_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Cte && n.qualified_name.as_deref() == Some("org"))
.collect();
assert!(
cte_ref_nodes.len() >= 2,
"CTE self-join should produce at least 2 CTE nodes (definition + instance), got {}",
cte_ref_nodes.len()
);
let nodes_with_filters: Vec<_> = cte_ref_nodes
.iter()
.filter(|n| !n.filters.is_empty())
.collect();
assert_eq!(
nodes_with_filters.len(),
2,
"each CTE self-join instance should receive its own filter"
);
}
#[test]
fn cte_self_join_filters_are_isolated_per_instance() {
let sql = r#"
WITH org AS (
SELECT id, name, manager_id, department FROM employees
)
SELECT a.name AS eng_name, b.name AS mgr_name
FROM org a
JOIN org b ON a.manager_id = b.id
WHERE a.department = 'eng' AND b.department = 'sales'
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let filtered_cte_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Cte && !n.filters.is_empty())
.collect();
assert_eq!(
filtered_cte_nodes.len(),
2,
"each CTE instance should receive its own filter"
);
let all_filter_texts: Vec<String> = filtered_cte_nodes
.iter()
.flat_map(|n| n.filters.iter().map(|f| f.expression.clone()))
.collect();
assert!(
all_filter_texts.iter().any(|f| f.contains("eng")),
"one instance should have the 'eng' filter, got: {:?}",
all_filter_texts
);
assert!(
all_filter_texts.iter().any(|f| f.contains("sales")),
"one instance should have the 'sales' filter, got: {:?}",
all_filter_texts
);
for node in &filtered_cte_nodes {
let filter_texts: Vec<_> = node.filters.iter().map(|f| &f.expression).collect();
let has_both = filter_texts.iter().any(|f| f.contains("eng"))
&& filter_texts.iter().any(|f| f.contains("sales"));
assert!(
!has_both,
"CTE node {} should not have both filters, got: {:?}",
node.id, filter_texts
);
}
}
#[test]
fn nested_cte_self_join_aliases_keep_filters_isolated_per_scope() {
let sql = r#"
WITH org AS (
SELECT id, name, manager_id, department
FROM employees
)
SELECT b.name, sub.inner_mgr
FROM org a
JOIN org b ON a.manager_id = b.id
JOIN (
SELECT b.id, b.name AS inner_mgr
FROM org a
JOIN org b ON a.manager_id = b.id
WHERE b.department = 'sales'
) sub ON sub.id = b.id
WHERE b.department = 'eng'
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let filtered_cte_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Cte
&& n.qualified_name.as_deref() == Some("org")
&& !n.filters.is_empty()
})
.collect();
assert_eq!(
filtered_cte_nodes.len(),
2,
"outer and inner CTE self-join aliases should keep separate filtered nodes"
);
let filter_sets: HashSet<Vec<String>> = filtered_cte_nodes
.iter()
.map(|node| {
let mut filters: Vec<String> =
node.filters.iter().map(|f| f.expression.clone()).collect();
filters.sort();
filters
})
.collect();
assert!(
filter_sets.contains(&vec!["b.department = 'eng'".to_string()]),
"expected one filtered node for the outer b alias, got {filter_sets:?}"
);
assert!(
filter_sets.contains(&vec!["b.department = 'sales'".to_string()]),
"expected one filtered node for the inner b alias, got {filter_sets:?}"
);
}
#[test]
fn self_join_with_subquery_alias_conflict() {
let sql = r#"
SELECT t1.name, t2.id
FROM (SELECT name, id FROM employees WHERE active = true) t1
JOIN employees t2 ON t1.id = t2.manager_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert!(
!table_nodes.is_empty(),
"should have table nodes for employees"
);
let derived_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Cte)
.collect();
assert!(
!derived_nodes.is_empty(),
"derived subquery should produce a CTE-like node"
);
let global_node_ids: HashSet<_> = result.nodes.iter().map(|n| n.id.clone()).collect();
for edge in &result.edges {
assert!(
global_node_ids.contains(&edge.from),
"global edge {} has missing source node {}",
edge.id,
edge.from
);
assert!(
global_node_ids.contains(&edge.to),
"global edge {} has missing target node {}",
edge.id,
edge.to
);
}
}
#[test]
fn self_join_flat_edges_resolve_correctly() {
let sql = r#"
SELECT e1.name AS emp_name, e2.name AS mgr_name
FROM employees e1
JOIN employees e2 ON e1.manager_id = e2.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let employees_nodes: Vec<_> = result
.nodes
.iter()
.filter(|n| {
n.canonical_name.as_ref().unwrap().name == "employees" && n.node_type == NodeType::Table
})
.collect();
assert_eq!(employees_nodes.len(), 2, "one node per self-join instance");
let node_ids: HashSet<_> = result.nodes.iter().map(|n| n.id.clone()).collect();
for edge in &result.edges {
assert!(
node_ids.contains(&edge.from),
"edge from={} not found in nodes",
edge.from
);
assert!(
node_ids.contains(&edge.to),
"edge to={} not found in nodes",
edge.to
);
}
for employees_node in &employees_nodes {
let ownership_edges: Vec<_> = result
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::Ownership && e.from == employees_node.id)
.collect();
assert!(
!ownership_edges.is_empty(),
"each employees instance should own column nodes"
);
}
}
#[test]
fn complex_pattern_star_schema_joins() {
let sql = r#"
SELECT
f.sale_id,
d_time.year,
d_time.quarter,
d_product.category,
d_product.brand,
d_customer.segment,
d_store.region,
f.amount
FROM fact_sales f
JOIN dim_time d_time ON f.time_id = d_time.id
JOIN dim_product d_product ON f.product_id = d_product.id
JOIN dim_customer d_customer ON f.customer_id = d_customer.id
JOIN dim_store d_store ON f.store_id = d_store.id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in [
"fact_sales",
"dim_time",
"dim_product",
"dim_customer",
"dim_store",
] {
assert!(
tables.contains(expected),
"star schema should track all dimension tables: {expected}"
);
}
}
#[test]
fn complex_pattern_slowly_changing_dimension() {
let sql = r#"
SELECT
f.transaction_id,
f.transaction_date,
d.customer_name,
d.customer_tier,
d.effective_from,
d.effective_to
FROM fact_transactions f
JOIN dim_customer_scd d
ON f.customer_id = d.customer_id
AND f.transaction_date >= d.effective_from
AND f.transaction_date < COALESCE(d.effective_to, '9999-12-31');
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in ["fact_transactions", "dim_customer_scd"] {
assert!(
tables.contains(expected),
"SCD pattern should track {expected}"
);
}
}
#[test]
fn insert_multi_row_values() {
let sql = r#"
INSERT INTO users (id, name, email)
VALUES
(1, 'Alice', 'alice@example.com'),
(2, 'Bob', 'bob@example.com'),
(3, 'Charlie', 'charlie@example.com');
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("users"),
"multi-row INSERT should track target table"
);
}
#[test]
fn insert_with_default_values() {
let sql = r#"
INSERT INTO logs DEFAULT VALUES;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("logs"),
"INSERT DEFAULT VALUES should track target"
);
}
#[test]
fn insert_on_conflict_postgres() {
let sql = r#"
INSERT INTO users (id, email, updated_at)
SELECT id, email, CURRENT_TIMESTAMP
FROM staging_users
ON CONFLICT (id)
DO UPDATE SET
email = EXCLUDED.email,
updated_at = EXCLUDED.updated_at;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
for expected in ["users", "staging_users"] {
assert!(
tables.contains(expected),
"INSERT ON CONFLICT should track {expected}"
);
}
}
#[test]
fn insert_with_cte_source() {
let sql = r#"
WITH prepared_data AS (
SELECT
id,
UPPER(name) AS name,
LOWER(email) AS email
FROM staging
WHERE valid = true
)
INSERT INTO users (id, name, email)
SELECT id, name, email FROM prepared_data;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in ["users", "staging"] {
assert!(
tables.contains(expected),
"INSERT with CTE should track {expected}"
);
}
let ctes = collect_cte_names(&result);
assert!(
ctes.contains("prepared_data"),
"INSERT should track CTE used in source"
);
}
#[test]
fn snowflake_qualify_clause_filters_window_results() {
let sql = r#"
SELECT
user_id,
order_date,
amount,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY order_date DESC) AS rn
FROM orders
QUALIFY rn = 1;
"#;
let result = run_analysis(sql, Dialect::Snowflake, None);
assert!(
result.summary.statement_count >= 1,
"QUALIFY clause should parse in Snowflake"
);
}
#[test]
fn snowflake_flatten_lateral_unnest() {
let sql = r#"
SELECT
u.id,
f.value::STRING AS tag
FROM analytics.users u,
LATERAL FLATTEN(input => u.tags) f
WHERE f.value IS NOT NULL;
"#;
let result = run_analysis(sql, Dialect::Snowflake, None);
assert!(
result.summary.statement_count >= 1,
"FLATTEN should parse in Snowflake"
);
}
#[test]
fn snowflake_time_travel_query() {
let sql = r#"
SELECT order_id, amount
FROM orders
AT(TIMESTAMP => '2024-01-01 00:00:00'::timestamp);
"#;
let result = run_analysis(sql, Dialect::Snowflake, None);
assert!(
result.summary.statement_count == 0 || result.summary.statement_count >= 1,
"Time travel query analysis should complete (may parse with 0 statements if unsupported)"
);
}
#[test]
fn bigquery_struct_and_array_agg() {
let sql = r#"
SELECT
user_id,
ARRAY_AGG(STRUCT(product_id, quantity, price)) AS items
FROM order_items
GROUP BY user_id;
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("order_items"),
"STRUCT/ARRAY_AGG should track source"
);
}
#[test]
fn bigquery_except_and_replace_modifiers() {
let sql = r#"
SELECT * EXCEPT (password, ssn)
REPLACE (UPPER(email) AS email, LOWER(name) AS name)
FROM users;
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("users"),
"EXCEPT/REPLACE modifiers should track table"
);
}
#[test]
fn bigquery_unnest_arrays() {
let sql = r#"
SELECT u.user_id, tag
FROM users u
CROSS JOIN UNNEST(u.tags) AS tag
WHERE tag LIKE 'tech%';
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("users"),
"UNNEST should preserve base table lineage"
);
}
#[test]
fn bigquery_hyphenated_project_refs() {
let sql = r#"
SELECT id, name
FROM `project-a.dataset-b.users`;
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("project-a.dataset-b.users"),
"hyphenated project refs should track full qualified name"
);
}
#[test]
fn bigquery_hyphenated_refs_join() {
let sql = r#"
SELECT
u.user_id,
o.order_total
FROM `my-company.core.users` u
JOIN `my-company.sales.orders` o ON u.user_id = o.user_id;
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("my-company.core.users"),
"should track hyphenated users table"
);
assert!(
tables.contains("my-company.sales.orders"),
"should track hyphenated orders table"
);
}
#[test]
fn bigquery_unnest_with_offset() {
let sql = r#"
SELECT item, offset_pos
FROM UNNEST([10, 20, 30]) AS item WITH OFFSET AS offset_pos;
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
assert!(
result.issues.is_empty()
|| result
.issues
.iter()
.all(|i| i.severity != flowscope_core::Severity::Error),
"UNNEST with OFFSET should parse without errors"
);
}
#[test]
fn bigquery_unnest_struct_expansion() {
let sql = r#"
SELECT
order_id,
line_item.product_id,
line_item.quantity
FROM orders,
UNNEST(line_items) AS line_item;
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"UNNEST struct expansion should track source table"
);
}
#[test]
fn bigquery_select_except_excludes_columns() {
let sql = r#"
SELECT * EXCEPT (password, ssn)
FROM users;
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("users"),
"SELECT EXCEPT should track source table"
);
}
#[test]
fn bigquery_select_replace_transforms() {
let sql = r#"
SELECT * REPLACE (UPPER(email) AS email)
FROM customers;
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("customers"),
"SELECT REPLACE should track source table"
);
}
#[test]
fn bigquery_select_except_replace_combined() {
let sql = r#"
SELECT * EXCEPT (internal_id)
REPLACE (ROUND(price, 2) AS price, LOWER(sku) AS sku)
FROM products;
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("products"),
"Combined EXCEPT/REPLACE should track source table"
);
}
#[test]
fn postgres_distinct_on_clause() {
let sql = r#"
SELECT DISTINCT ON (user_id)
user_id,
order_date,
amount
FROM orders
ORDER BY user_id, order_date DESC;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"DISTINCT ON should track source table"
);
}
#[test]
fn postgres_json_operators() {
let sql = r#"
SELECT
data->>'user' AS user_name,
data->'metadata'->>'email' AS email,
(data#>>'{address,city}') AS city
FROM events;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("events"),
"JSON operators should track source table"
);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
!derivations.is_empty(),
"JSON extraction should create derivation edges"
);
}
#[test]
fn postgres_array_operators_and_functions() {
let sql = r#"
SELECT
product_id,
name
FROM products
WHERE tags @> ARRAY['electronics', 'sale']
OR 'premium' = ANY(tags);
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("products"),
"array operators should track table"
);
}
#[test]
fn error_ambiguous_column_reference() {
let sql = r#"
SELECT id
FROM orders o
JOIN users u ON o.user_id = u.id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders") && tables.contains("users"),
"ambiguous column should not prevent table tracking"
);
}
#[test]
fn error_unknown_table_without_schema() {
let sql = r#"
SELECT id, name FROM nonexistent_table;
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![schema_table(None, None, "users", &["id", "name"])],
};
let result = run_analysis(sql, Dialect::Generic, Some(schema));
assert!(
result.summary.statement_count >= 1,
"query with unknown table should still parse"
);
}
#[test]
fn error_column_count_mismatch_in_insert() {
let sql = r#"
INSERT INTO users (id, name)
SELECT id, name, email, age FROM staging;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("users") && tables.contains("staging"),
"column mismatch should not prevent lineage tracking"
);
}
#[test]
fn error_invalid_alias_in_where_clause() {
let sql = r#"
SELECT name AS user_name
FROM users
WHERE user_name = 'Alice';
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
!result.summary.has_errors,
"alias usage validation is dialect-specific"
);
}
#[test]
fn error_missing_group_by_column() {
let sql = r#"
SELECT user_id, region, COUNT(*) AS total
FROM orders
GROUP BY user_id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"GROUP BY errors should not prevent lineage"
);
}
#[test]
fn ddl_create_view_tracks_dependencies() {
let sql = r#"
CREATE VIEW active_user_orders AS
SELECT
u.id,
u.name,
o.order_id,
o.amount
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.active = true;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
result.summary.statement_count >= 1,
"CREATE VIEW should parse"
);
let view_node = result
.nodes_in_statement(0)
.find(|n| &*n.label == "active_user_orders");
assert!(view_node.is_some(), "Should find view node");
assert_eq!(
view_node.unwrap().node_type,
NodeType::View,
"CREATE VIEW should create a View node type, not Table"
);
}
#[test]
fn ddl_create_view_with_cte() {
let sql = r#"
CREATE OR REPLACE VIEW customer_summary AS
WITH order_stats AS (
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(amount) AS total_spent
FROM orders
GROUP BY customer_id
)
SELECT
c.id,
c.name,
COALESCE(os.order_count, 0) AS orders,
COALESCE(os.total_spent, 0) AS spent
FROM customers c
LEFT JOIN order_stats os ON c.id = os.customer_id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
result.summary.statement_count >= 1,
"CREATE VIEW with CTE should parse"
);
}
#[test]
fn ddl_create_temp_table() {
let sql = r#"
CREATE TEMP TABLE daily_summary AS
SELECT
DATE(created_at) AS date,
COUNT(*) AS event_count,
COUNT(DISTINCT user_id) AS unique_users
FROM events
WHERE created_at >= CURRENT_DATE
GROUP BY DATE(created_at);
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("events"),
"CREATE TEMP TABLE should track source"
);
}
#[test]
fn forward_declared_tables_are_known_before_usage() {
let sql = r#"
CREATE VIEW future_view AS
SELECT id FROM future_table;
CREATE TABLE future_table (id INT, name TEXT);
SELECT * FROM future_view;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let unresolved_count = result
.issues
.iter()
.filter(|issue| issue.code == issue_codes::UNRESOLVED_REFERENCE)
.count();
assert_eq!(
unresolved_count, 0,
"forward references should not emit UNRESOLVED_REFERENCE warnings"
);
let unknown_column_count = result
.issues
.iter()
.filter(|issue| issue.code == issue_codes::UNKNOWN_COLUMN)
.count();
assert_eq!(
unknown_column_count, 0,
"columns from forward-declared tables should be known"
);
}
#[test]
fn ddl_multi_statement_temp_table_pipeline() {
let sql = r#"
CREATE TEMP TABLE bronze AS
SELECT * FROM raw_events WHERE valid = true;
CREATE TEMP TABLE silver AS
SELECT event_id, user_id, event_type, created_at
FROM bronze
WHERE event_type IS NOT NULL;
CREATE TABLE gold AS
SELECT
user_id,
event_type,
COUNT(*) AS event_count
FROM silver
GROUP BY user_id, event_type;
SELECT * FROM gold ORDER BY event_count DESC LIMIT 100;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
assert_eq!(
result.summary.statement_count, 4,
"temp table pipeline should have 4 statements"
);
let tables = collect_table_names(&result);
for expected in ["raw_events", "bronze", "silver", "gold"] {
assert!(
tables.contains(expected),
"pipeline should track {expected}"
);
}
let cross_edges: Vec<_> = result
.edges
.iter()
.filter(|edge| edge.edge_type == EdgeType::CrossStatement)
.collect();
assert!(
cross_edges.len() >= 3,
"temp table pipeline should have cross-statement edges"
);
}
#[test]
fn view_and_table_in_same_statement() {
let sql = r#"
CREATE VIEW active_users AS SELECT id, name FROM users WHERE active = true;
CREATE TABLE orders (order_id INT, user_id INT, amount DECIMAL);
SELECT v.name, o.amount
FROM active_users v
JOIN orders o ON v.id = o.user_id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert_eq!(
result.summary.statement_count, 3,
"Should have 3 statements"
);
let view_node = result
.nodes_in_statement(0)
.find(|n| &*n.label == "active_users");
assert!(view_node.is_some(), "Should find view node");
assert_eq!(
view_node.unwrap().node_type,
NodeType::View,
"active_users should be a View"
);
let table_node = result.nodes_in_statement(1).find(|n| &*n.label == "orders");
assert!(table_node.is_some(), "Should find orders table node");
assert_eq!(
table_node.unwrap().node_type,
NodeType::Table,
"orders should be a Table"
);
}
#[test]
fn cross_statement_view_lineage() {
let sql = r#"
CREATE VIEW user_orders AS
SELECT u.id, u.name, o.order_id
FROM users u
JOIN orders o ON u.id = o.user_id;
SELECT name, COUNT(*) as order_count
FROM user_orders
GROUP BY name;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert_eq!(
result.summary.statement_count, 2,
"Should have 2 statements"
);
let cross_edges: Vec<_> = result
.edges
.iter()
.filter(|edge| edge.edge_type == EdgeType::CrossStatement)
.collect();
assert!(
!cross_edges.is_empty(),
"Should have cross-statement edges linking view creation to its usage"
);
let global_view = result.nodes.iter().find(|n| &*n.label == "user_orders");
assert!(global_view.is_some(), "Should find view in global lineage");
assert_eq!(
global_view.unwrap().node_type,
NodeType::View,
"View should retain View type in global lineage"
);
}
#[test]
fn mixed_table_view_cte_in_pipeline() {
let sql = r#"
CREATE TABLE raw_events (event_id INT, user_id INT, event_type VARCHAR(50));
CREATE VIEW filtered_events AS
SELECT event_id, user_id, event_type
FROM raw_events
WHERE event_type IN ('click', 'purchase');
WITH event_counts AS (
SELECT user_id, COUNT(*) as cnt
FROM filtered_events
GROUP BY user_id
)
SELECT u.name, ec.cnt
FROM users u
JOIN event_counts ec ON u.id = ec.user_id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert_eq!(
result.summary.statement_count, 3,
"Should have 3 statements"
);
let mut table_count = 0;
let mut view_count = 0;
let mut cte_count = 0;
for node in &result.nodes {
match node.node_type {
NodeType::Table => table_count += 1,
NodeType::View => view_count += 1,
NodeType::Cte => cte_count += 1,
NodeType::Column | NodeType::Output => {}
}
}
assert!(
table_count >= 2,
"Should have at least 2 tables (raw_events, users)"
);
assert!(
view_count >= 1,
"Should have at least 1 view (filtered_events)"
);
assert!(cte_count >= 1, "Should have at least 1 CTE (event_counts)");
}
#[test]
fn node_type_helper_methods() {
assert!(
NodeType::Table.is_table_like(),
"Table should be table-like"
);
assert!(NodeType::View.is_table_like(), "View should be table-like");
assert!(NodeType::Cte.is_table_like(), "Cte should be table-like");
assert!(
!NodeType::Column.is_table_like(),
"Column should not be table-like"
);
assert!(
NodeType::Table.is_table_or_view(),
"Table should be table-or-view"
);
assert!(
NodeType::View.is_table_or_view(),
"View should be table-or-view"
);
assert!(
!NodeType::Cte.is_table_or_view(),
"Cte should NOT be table-or-view"
);
assert!(
!NodeType::Column.is_table_or_view(),
"Column should not be table-or-view"
);
}
#[test]
fn view_referenced_multiple_times() {
let sql = r#"
CREATE VIEW product_summary AS
SELECT product_id, SUM(quantity) as total_qty
FROM order_items
GROUP BY product_id;
SELECT * FROM product_summary WHERE total_qty > 100;
SELECT * FROM product_summary WHERE total_qty < 10;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert_eq!(
result.summary.statement_count, 3,
"Should have 3 statements"
);
let view_nodes: Vec<_> = result
.nodes
.iter()
.filter(|n| &*n.label == "product_summary")
.collect();
assert_eq!(
view_nodes.len(),
1,
"View should appear exactly once in global lineage"
);
let view_node = view_nodes[0];
assert!(
view_node.statement_ids.len() >= 2,
"View should be referenced by multiple statements"
);
}
#[test]
fn scale_deeply_nested_ctes() {
let sql = r#"
WITH
l1 AS (SELECT id FROM orders),
l2 AS (SELECT id FROM l1),
l3 AS (SELECT id FROM l2),
l4 AS (SELECT id FROM l3),
l5 AS (SELECT id FROM l4),
l6 AS (SELECT id FROM l5),
l7 AS (SELECT id FROM l6),
l8 AS (SELECT id FROM l7),
l9 AS (SELECT id FROM l8),
l10 AS (SELECT id FROM l9)
SELECT * FROM l10;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let ctes = collect_cte_names(&result);
assert_eq!(ctes.len(), 10, "deeply nested CTEs should all be tracked");
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"base table should be preserved through deep nesting"
);
}
#[test]
fn scale_wide_select_many_columns() {
let columns: Vec<String> = (1..=50)
.map(|i| format!("col{} AS output{}", i, i))
.collect();
let sql = format!("SELECT {} FROM wide_table;", columns.join(", "));
let result = run_analysis(&sql, Dialect::Generic, None);
let stmt = first_statement(&result);
assert!(
stmt.nodes
.iter()
.filter(|n| n.node_type == NodeType::Column)
.count()
>= 50,
"wide SELECT should track all columns"
);
}
#[test]
fn scale_many_union_branches() {
let sql = r#"
SELECT id, 'source1' AS source FROM table1
UNION ALL
SELECT id, 'source2' FROM table2
UNION ALL
SELECT id, 'source3' FROM table3
UNION ALL
SELECT id, 'source4' FROM table4
UNION ALL
SELECT id, 'source5' FROM table5
UNION ALL
SELECT id, 'source6' FROM table6
UNION ALL
SELECT id, 'source7' FROM table7
UNION ALL
SELECT id, 'source8' FROM table8;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert_eq!(
tables.len(),
8,
"many UNION branches should track all source tables"
);
}
#[test]
fn scale_long_join_chain() {
let sql = r#"
SELECT
t1.id,
t2.value AS v2,
t3.value AS v3,
t4.value AS v4,
t5.value AS v5,
t6.value AS v6
FROM table1 t1
JOIN table2 t2 ON t1.id = t2.ref_id
JOIN table3 t3 ON t2.id = t3.ref_id
JOIN table4 t4 ON t3.id = t4.ref_id
JOIN table5 t5 ON t4.id = t5.ref_id
JOIN table6 t6 ON t5.id = t6.ref_id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert_eq!(tables.len(), 6, "long JOIN chain should track all tables");
}
#[test]
fn scale_complex_multi_statement_etl() {
let sql = r#"
-- Stage 1: Extract
CREATE TABLE staging_raw AS
SELECT * FROM external_data WHERE loaded_at >= CURRENT_DATE;
-- Stage 2: Clean
CREATE TABLE staging_clean AS
SELECT
id,
TRIM(name) AS name,
LOWER(email) AS email,
CAST(created_at AS DATE) AS created_date
FROM staging_raw
WHERE email IS NOT NULL;
-- Stage 3: Enrich
CREATE TABLE staging_enriched AS
SELECT
sc.id,
sc.name,
sc.email,
sc.created_date,
d.region,
d.segment
FROM staging_clean sc
LEFT JOIN dimensions d ON sc.id = d.customer_id;
-- Stage 4: Aggregate
INSERT INTO summary_table
SELECT
region,
segment,
DATE_TRUNC('month', created_date) AS month,
COUNT(*) AS customer_count,
COUNT(DISTINCT email) AS unique_emails
FROM staging_enriched
GROUP BY region, segment, DATE_TRUNC('month', created_date);
-- Stage 5: Report
SELECT
region,
SUM(customer_count) AS total_customers
FROM summary_table
WHERE month >= DATE_TRUNC('year', CURRENT_DATE)
GROUP BY region
ORDER BY total_customers DESC;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
assert_eq!(
result.summary.statement_count, 5,
"complex ETL should track all 5 stages"
);
let tables = collect_table_names(&result);
for expected in [
"external_data",
"staging_raw",
"staging_clean",
"staging_enriched",
"dimensions",
"summary_table",
] {
assert!(
tables.contains(expected),
"ETL pipeline should track {expected}"
);
}
let cross_edges: Vec<_> = result
.edges
.iter()
.filter(|edge| edge.edge_type == EdgeType::CrossStatement)
.collect();
assert!(
cross_edges.len() >= 4,
"complex ETL should have multiple cross-statement edges"
);
}
#[test]
fn column_ownership_edges_link_tables_to_columns() {
let sql = r#"
SELECT id, name, email FROM users;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let ownership_edges = edges_by_type(&stmt, EdgeType::Ownership);
assert!(
!ownership_edges.is_empty(),
"should have ownership edges from table to columns"
);
let table = find_table_node(&stmt, "users");
assert!(table.is_some(), "users table should exist as node");
for col_name in ["id", "name", "email"] {
let col = find_column_node(&stmt, col_name);
assert!(col.is_some(), "column {col_name} should exist as node");
}
}
#[test]
fn column_dataflow_edges_track_simple_projection() {
let sql = r#"
WITH source AS (
SELECT user_id, email FROM users
)
SELECT user_id, email FROM source;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let dataflow_edges = edges_by_type(&stmt, EdgeType::DataFlow);
assert!(
!dataflow_edges.is_empty(),
"should have data flow edges between columns"
);
assert!(
find_column_node(&stmt, "user_id").is_some(),
"user_id column should exist"
);
assert!(
find_column_node(&stmt, "email").is_some(),
"email column should exist"
);
}
#[test]
fn column_derivation_edges_capture_transformations() {
let sql = r#"
SELECT
user_id,
amount * 1.1 AS amount_with_tax,
UPPER(name) AS name_upper
FROM orders;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let derivation_edges = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivation_edges.len() >= 2,
"should have derivation edges for computed columns"
);
let amount_with_tax = find_column_node(&stmt, "amount_with_tax");
let name_upper = find_column_node(&stmt, "name_upper");
assert!(
amount_with_tax.is_some() || name_upper.is_some(),
"derived columns should exist as nodes"
);
}
#[test]
fn column_qualified_names_preserve_table_context() {
let sql = r#"
SELECT
o.order_id,
o.amount,
u.user_id,
u.name
FROM orders o
JOIN users u ON o.user_id = u.user_id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
for expected in ["order_id", "amount", "user_id", "name"] {
assert!(
cols.contains(&expected.to_string()),
"column {expected} should be tracked"
);
}
assert!(
find_table_node(&stmt, "orders").is_some(),
"orders table should exist"
);
assert!(
find_table_node(&stmt, "users").is_some(),
"users table should exist"
);
}
#[test]
fn column_lineage_through_aggregation() {
let sql = r#"
SELECT
user_id,
COUNT(*) AS order_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount
FROM orders
GROUP BY user_id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
!derivations.is_empty(),
"aggregation should create derivation edges"
);
for col in ["user_id", "order_count", "total_amount", "avg_amount"] {
assert!(
find_column_node(&stmt, col).is_some(),
"output column {col} should exist"
);
}
}
#[test]
fn column_lineage_through_join_preserves_sources() {
let sql = r#"
SELECT
o.order_id,
p.payment_id,
o.amount,
p.payment_method
FROM orders o
JOIN payments p ON o.order_id = p.order_id;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
for expected in ["order_id", "payment_id", "amount", "payment_method"] {
assert!(
cols.contains(&expected.to_string()),
"joined column {expected} should exist"
);
}
let ownership = edges_by_type(&stmt, EdgeType::Ownership);
assert!(
ownership.len() >= 2,
"should have ownership edges from both joined tables"
);
}
#[test]
fn column_expression_text_captured_for_derived_columns() {
let sql = r#"
SELECT
order_id,
CASE
WHEN amount > 1000 THEN 'high'
WHEN amount > 100 THEN 'medium'
ELSE 'low'
END AS amount_tier
FROM orders;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let amount_tier = find_column_node(&stmt, "amount_tier");
assert!(
amount_tier.is_some(),
"derived column amount_tier should exist"
);
if let Some(node) = amount_tier {
let _has_expression = node.expression.is_some();
}
}
#[test]
fn column_lineage_multi_level_cte_chain() {
let sql = r#"
WITH stage1 AS (
SELECT user_id, amount FROM orders
),
stage2 AS (
SELECT user_id, amount * 2 AS doubled_amount FROM stage1
),
stage3 AS (
SELECT user_id, doubled_amount + 100 AS final_amount FROM stage2
)
SELECT user_id, final_amount FROM stage3;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
assert!(
find_column_node(&stmt, "user_id").is_some(),
"user_id should exist"
);
let cols = column_labels(&stmt);
assert!(
cols.contains(&"final_amount".to_string()),
"final derived column should exist"
);
let dataflow = edges_by_type(&stmt, EdgeType::DataFlow);
let derivation = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
!dataflow.is_empty() || !derivation.is_empty(),
"should have edges connecting CTE stages"
);
}
#[test]
fn column_wildcard_expansion_with_schema() {
let sql = r#"
SELECT * FROM users;
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![schema_table(
None,
None,
"users",
&["id", "name", "email", "created_at"],
)],
};
let result = run_analysis(sql, Dialect::Generic, Some(schema));
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
assert!(
!cols.is_empty(),
"SELECT * with schema should produce column nodes"
);
}
#[test]
fn column_subquery_column_propagation() {
let sql = r#"
SELECT
user_id,
total_orders,
total_amount
FROM (
SELECT
user_id,
COUNT(*) AS total_orders,
SUM(amount) AS total_amount
FROM orders
GROUP BY user_id
) AS subq
WHERE total_orders > 5;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
for col in ["user_id", "total_orders", "total_amount"] {
assert!(
find_column_node(&stmt, col).is_some(),
"column {col} from subquery should be tracked"
);
}
assert!(
!stmt.edges.is_empty(),
"should have edges for column propagation from subquery"
);
}
#[test]
fn derived_table_alias_tracks_column_flow() {
let sql = r#"
SELECT sub.total_amount
FROM (
SELECT SUM(amount) AS total_amount
FROM orders
) AS sub
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let derived_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Cte && &*node.label == "sub")
.expect("derived table node should exist");
let derived_column_id = stmt
.edges
.iter()
.find(|edge| edge.edge_type == EdgeType::Ownership && edge.from == derived_node.id)
.map(|edge| edge.to.clone())
.expect("derived table should own columns");
assert!(
stmt.edges
.iter()
.any(|edge| { edge.edge_type == EdgeType::DataFlow && edge.from == derived_column_id }),
"derived column should feed outer SELECT via data flow edges"
);
assert!(
stmt.edges
.iter()
.any(|edge| { edge.edge_type == EdgeType::Derivation && edge.to == derived_column_id }),
"orders.amount should derive the intermediate column before projection"
);
}
#[test]
fn derived_table_alias_does_not_shadow_cte_with_same_name() {
let sql = r#"
WITH sales AS (
SELECT order_id
FROM orders
)
SELECT order_id
FROM (
SELECT order_id
FROM web_orders
) AS sales
UNION ALL
SELECT order_id
FROM sales;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let cte_node = stmt
.nodes
.iter()
.find(|node| {
node.node_type == NodeType::Cte
&& node.id.starts_with("cte_")
&& &*node.label == "sales"
})
.expect("original sales CTE should exist");
let cte_columns: HashSet<_> = stmt
.edges
.iter()
.filter(|edge| edge.edge_type == EdgeType::Ownership && edge.from == cte_node.id)
.map(|edge| edge.to.clone())
.collect();
assert!(
!cte_columns.is_empty(),
"sales CTE should expose columns for downstream references"
);
let cte_flows_into_union = stmt
.edges
.iter()
.any(|edge| edge.edge_type == EdgeType::DataFlow && cte_columns.contains(&edge.from));
assert!(
cte_flows_into_union,
"sales CTE columns should feed into the UNION output even when a derived table reuses the alias"
);
}
#[test]
fn column_union_combines_column_sets() {
let sql = r#"
SELECT user_id, amount FROM orders
UNION ALL
SELECT customer_id AS user_id, payment_amount AS amount FROM payments;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
assert!(
cols.contains(&"user_id".to_string()),
"UNION output should have user_id column"
);
assert!(
cols.contains(&"amount".to_string()),
"UNION output should have amount column"
);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"first UNION branch table should be tracked"
);
assert!(
tables.contains("payments"),
"second UNION branch table should be tracked"
);
}
#[test]
fn ctas_implied_schema_ignores_inner_columns() {
let sql = r#"
CREATE TABLE tgt AS
SELECT id
FROM (
SELECT id, extra FROM source
) s
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let resolved = result
.resolved_schema
.expect("resolved schema should be present");
let tgt_table = resolved
.tables
.iter()
.find(|table| table.name == "tgt")
.expect("tgt table should exist in resolved schema");
let columns: Vec<_> = tgt_table
.columns
.iter()
.map(|col| col.name.clone())
.collect();
assert_eq!(
columns,
vec!["id"],
"tgt schema should only include columns from the outer projection"
);
}
#[test]
fn implied_schema_captures_join_relationships() {
let sql = r#"
CREATE TABLE b AS
SELECT
CAST(t1.a AS INT) AS a,
CAST(t1.b AS INT) AS b,
CAST(t1.c AS INT) AS c
FROM table1 AS t1
LEFT JOIN table2 AS t2 ON t1.a = t2.a
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let resolved = result
.resolved_schema
.expect("resolved schema should be present");
let table1 = resolved
.tables
.iter()
.find(|table| table.name == "table1")
.expect("table1 should exist in resolved schema");
let table2 = resolved
.tables
.iter()
.find(|table| table.name == "table2")
.expect("table2 should exist in resolved schema");
let table1_fk = table1.constraints.iter().find(|constraint| {
constraint.constraint_type == ConstraintType::ForeignKey
&& constraint.referenced_table.as_deref() == Some("table2")
&& constraint.columns == ["a"]
&& constraint
.referenced_columns
.as_ref()
.map(|cols| cols.as_slice() == ["a"])
.unwrap_or(false)
});
assert!(
table1_fk.is_some(),
"table1 should have a foreign key constraint to table2"
);
let table2_fk = table2.constraints.iter().find(|constraint| {
constraint.constraint_type == ConstraintType::ForeignKey
&& constraint.referenced_table.as_deref() == Some("table1")
&& constraint.columns == ["a"]
&& constraint
.referenced_columns
.as_ref()
.map(|cols| cols.as_slice() == ["a"])
.unwrap_or(false)
});
assert!(
table2_fk.is_some(),
"table2 should have a foreign key constraint to table1"
);
let table1_column = table1
.columns
.iter()
.find(|column| column.name == "a")
.expect("table1.a should exist");
let table2_column = table2
.columns
.iter()
.find(|column| column.name == "a")
.expect("table2.a should exist");
let table1_fk_ref = table1_column
.foreign_key
.as_ref()
.expect("table1.a should have foreign key metadata");
assert_eq!(table1_fk_ref.table, "table2");
assert_eq!(table1_fk_ref.column, "a");
let table2_fk_ref = table2_column
.foreign_key
.as_ref()
.expect("table2.a should have foreign key metadata");
assert_eq!(table2_fk_ref.table, "table1");
assert_eq!(table2_fk_ref.column, "a");
}
#[test]
fn ansi_lateral_join_standard_syntax() {
let sql = r#"
SELECT u.id, l.last_order_date
FROM users u
LEFT JOIN LATERAL (
SELECT MAX(order_date) as last_order_date
FROM orders o
WHERE o.user_id = u.id
) l ON true;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("users") && tables.contains("orders"),
"LATERAL JOIN should track both tables"
);
}
#[test]
fn ansi_window_frame_clause_ignored_but_preserved() {
let sql = r#"
SELECT
amount,
SUM(amount) OVER (
PARTITION BY user_id
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_sum
FROM transactions;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("transactions"),
"Window frame should not break lineage"
);
}
#[test]
fn ansi_cast_syntax_variants() {
let sql = r#"
SELECT
CAST(price AS INTEGER) as price_int,
quantity::FLOAT as quantity_float,
SAFE_CAST(date_str AS DATE) as safe_date
FROM sales;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivations.len() >= 3,
"All cast variants should produce derivation edges"
);
}
#[test]
fn ansi_having_subquery_lineage() {
let sql = r#"
SELECT user_id, SUM(amount)
FROM orders
GROUP BY user_id
HAVING SUM(amount) > (SELECT AVG(target) FROM sales_targets);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(tables.contains("orders"));
assert!(
tables.contains("sales_targets"),
"Subquery in HAVING should be tracked"
);
}
#[test]
fn quoted_identifiers_and_case_sensitivity() {
let sql = r#"
SELECT "U".id, "U"."Email Address"
FROM "Users" "U"
WHERE "U"."ActiveStatus" = 'Active';
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
let has_users = tables.contains("Users") || tables.contains("users");
assert!(has_users, "Quoted table name should be tracked");
}
#[test]
fn comments_handling_blocks_and_inline() {
let sql = r#"
/*
Block comment
spanning multiple lines
*/
SELECT * -- Inline comment
FROM /* comment in middle */ users;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
assert!(tables.contains("users"), "Comments should be ignored");
}
#[test]
fn column_lineage_cte_transformation_chain_with_reuse() {
let sql = r#"
WITH transformed AS (
SELECT
id,
UPPER(name) as name_upper,
LOWER(email) as email_lower
FROM users
)
SELECT
id,
name_upper,
CONCAT(name_upper, ' - ', email_lower) as display_name
FROM transformed;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let tables = collect_table_names(&result);
assert!(
tables.contains("users"),
"source table 'users' should be tracked"
);
let ctes = collect_cte_names(&result);
assert!(
ctes.contains("transformed"),
"CTE 'transformed' should be tracked"
);
let users_table = find_table_node(&stmt, "users");
let transformed_cte = stmt
.nodes
.iter()
.find(|n| n.node_type == NodeType::Cte && &*n.label == "transformed");
assert!(users_table.is_some(), "users table node should exist");
assert!(
transformed_cte.is_some(),
"transformed CTE node should exist"
);
let ownership_edges = edges_by_type(&stmt, EdgeType::Ownership);
assert!(
!ownership_edges.is_empty(),
"should have ownership edges linking tables/CTEs to their columns"
);
assert!(
find_column_node(&stmt, "id").is_some(),
"passthrough column 'id' should exist"
);
assert!(
find_column_node(&stmt, "name_upper").is_some(),
"CTE derived column 'name_upper' should exist"
);
assert!(
find_column_node(&stmt, "email_lower").is_some(),
"CTE derived column 'email_lower' should exist"
);
assert!(
find_column_node(&stmt, "display_name").is_some(),
"final derived column 'display_name' should exist"
);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivations.len() >= 3,
"should have derivation edges for UPPER, LOWER, and CONCAT transformations"
);
let dataflow = edges_by_type(&stmt, EdgeType::DataFlow);
assert!(
!dataflow.is_empty(),
"should have data flow edges from CTE columns to final SELECT"
);
let display_name_col = find_column_node(&stmt, "display_name");
if let Some(node) = display_name_col {
let _has_expr = node.expression.is_some();
let display_derivations: Vec<_> = derivations
.iter()
.filter(|edge| edge.to == node.id)
.collect();
assert!(
!display_derivations.is_empty(),
"display_name should have incoming derivation edges from source columns"
);
}
eprintln!("\n=== TABLE/CTE NODES ===");
for node in &stmt.nodes {
if node.node_type == NodeType::Table || node.node_type == NodeType::Cte {
eprintln!("{:?}: {}", node.node_type, node.label);
}
}
eprintln!("\n=== COLUMN NODES (sample) ===");
for node in stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Column)
.take(8)
{
eprintln!(
"Column: {} (expr: {:?})",
node.label,
node.expression.as_ref().map(|e| &e[..50.min(e.len())])
);
}
eprintln!("\n=== EDGE PATHS (sample) ===");
for edge in stmt.edges.iter().take(12) {
let from = stmt.nodes.iter().find(|n| n.id == edge.from);
let to = stmt.nodes.iter().find(|n| n.id == edge.to);
if let (Some(f), Some(t)) = (from, to) {
eprintln!(
"{:?}: {:?}({}) -> {:?}({})",
edge.edge_type, f.node_type, f.label, t.node_type, t.label
);
}
}
eprintln!("\n=== EDGE SUMMARY ===");
eprintln!("Ownership: {}", ownership_edges.len());
eprintln!("DataFlow: {}", dataflow.len());
eprintln!("Derivation: {}", derivations.len());
if let (Some(users), Some(transformed)) = (users_table, transformed_cte) {
let table_to_cte_edge = stmt
.edges
.iter()
.find(|e| e.from == users.id && e.to == transformed.id);
assert!(
table_to_cte_edge.is_some(),
"CRITICAL: should have direct edge from users table -> transformed CTE"
);
eprintln!(
"\n✅ CONFIRMED TABLE PATH: users -> transformed (edge type: {:?})",
table_to_cte_edge.map(|e| e.edge_type)
);
let users_owns_cols: Vec<_> = ownership_edges
.iter()
.filter(|e| e.from == users.id)
.collect();
assert!(
!users_owns_cols.is_empty(),
"users table should own columns (users -> name, email, id)"
);
eprintln!("users owns {} columns", users_owns_cols.len());
let transformed_owns_cols: Vec<_> = ownership_edges
.iter()
.filter(|e| e.from == transformed.id)
.collect();
assert!(
!transformed_owns_cols.is_empty(),
"transformed CTE should own columns (transformed -> id, name_upper, email_lower)"
);
eprintln!("transformed owns {} columns", transformed_owns_cols.len());
eprintln!("\n=== COLUMN OWNERSHIP VERIFICATION ===");
let mut columns_by_name: std::collections::HashMap<String, Vec<(&Node, Vec<&Node>)>> =
std::collections::HashMap::new();
for node in stmt.nodes.iter().copied() {
if node.node_type == NodeType::Column {
let owners: Vec<&Node> = ownership_edges
.iter()
.filter(|e| e.to == node.id)
.filter_map(|e| stmt.nodes.iter().copied().find(|n| n.id == e.from))
.collect();
columns_by_name
.entry(node.label.to_string())
.or_default()
.push((node, owners));
}
}
for (col_name, instances) in &columns_by_name {
eprintln!("\nColumn '{}': {} instance(s)", col_name, instances.len());
for (i, (node, owners)) in instances.iter().enumerate() {
eprintln!(
" [{}] id={}, owned by: {:?}",
i,
&node.id[..8],
owners
.iter()
.map(|n| format!("{:?}({})", n.node_type, n.label))
.collect::<Vec<_>>()
);
}
}
let users_owned_cols: Vec<&Node> = ownership_edges
.iter()
.filter(|e| e.from == users.id)
.filter_map(|e| stmt.nodes.iter().copied().find(|n| n.id == e.to))
.collect();
let users_col_names: Vec<_> = users_owned_cols.iter().map(|n| &*n.label).collect();
eprintln!("\nusers owns: {:?}", users_col_names);
for col in &users_owned_cols {
assert!(
&*col.label != "name_upper" && &*col.label != "email_lower",
"🐛 BUG DETECTED: users table incorrectly owns derived column '{}' (should only be in transformed CTE)",
col.label
);
}
let transformed_col_names: Vec<_> = transformed_owns_cols
.iter()
.filter_map(|e| stmt.nodes.iter().copied().find(|n| n.id == e.to))
.map(|n| &*n.label)
.collect();
eprintln!("transformed owns: {:?}", transformed_col_names);
eprintln!("\n=== EXPECTED vs ACTUAL ===");
eprintln!("Expected users columns: [id, name, email]");
eprintln!("Expected transformed columns: [id, name_upper, email_lower]");
eprintln!("Expected final SELECT columns: [id, name_upper, display_name]");
eprintln!("\nActual users owns: {:?}", users_col_names);
eprintln!("Actual transformed owns: {:?}", transformed_col_names);
}
}
#[test]
fn joined_tables_all_present_without_join_edges() {
let sql = r#"
SELECT
o.order_id,
c.customer_name,
oi.quantity,
p.product_name
FROM orders o
INNER JOIN customers c ON o.customer_id = c.id
LEFT JOIN order_items oi ON o.order_id = oi.order_id
LEFT JOIN products p ON oi.product_id = p.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let table_names: Vec<String> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.map(|n| n.label.to_string())
.collect();
eprintln!("Tables found: {:?}", table_names);
assert!(table_names.contains(&"orders".to_string()));
assert!(table_names.contains(&"customers".to_string()));
assert!(table_names.contains(&"order_items".to_string()));
assert!(table_names.contains(&"products".to_string()));
let table_ids: std::collections::HashSet<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.map(|n| &n.id)
.collect();
let table_to_table_edges: Vec<&Edge> = stmt
.edges
.iter()
.copied()
.filter(|e| table_ids.contains(&e.from) && table_ids.contains(&e.to))
.collect();
assert!(
table_to_table_edges.is_empty(),
"Should not have table-to-table edges for joins; found {:?}",
table_to_table_edges
);
let data_flow_edges = edges_by_type(&stmt, EdgeType::DataFlow);
assert!(
!data_flow_edges.is_empty(),
"Should have column-level data_flow edges"
);
}
#[test]
fn join_only_tables_emit_output_dependency() {
let sql = r#"
SELECT
t1.a,
t1.b
FROM table1 t1
LEFT JOIN table2 t2 ON t1.a = t2.a
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("Output node should exist");
let table2_node = find_table_node(&stmt, "table2").expect("table2 not found");
let join_dependency = stmt.edges.iter().find(|edge| {
edge.edge_type == EdgeType::JoinDependency
&& edge.from == table2_node.id
&& edge.to == output_node.id
});
assert!(
join_dependency.is_some(),
"join-only table should connect to output"
);
}
#[test]
fn join_only_tables_emit_output_dependency_for_count_star() {
let sql = r#"
SELECT COUNT(*)
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("Output node should exist");
let orders_node = find_table_node(&stmt, "orders").expect("orders not found");
let join_dependency = stmt.edges.iter().find(|edge| {
edge.edge_type == EdgeType::JoinDependency
&& edge.from == orders_node.id
&& edge.to == output_node.id
});
assert!(
join_dependency.is_some(),
"join-only table should connect to output for COUNT(*) queries"
);
}
#[test]
fn count_star_keeps_base_table_connected_to_output() {
let sql = r#"
SELECT COUNT(*)
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let users_node = find_table_node(&stmt, "users").expect("users not found");
let count_node = find_column_node(&stmt, "count").expect("count output column not found");
let orders_node = find_table_node(&stmt, "orders").expect("orders not found");
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("Output node should exist");
let users_dependency = stmt.edges.iter().find(|edge| {
edge.edge_type == EdgeType::Derivation
&& edge.from == users_node.id
&& edge.to == count_node.id
});
assert!(
users_dependency.is_some(),
"base table should connect to COUNT(*) output column"
);
let orders_dependency = stmt.edges.iter().find(|edge| {
edge.edge_type == EdgeType::JoinDependency
&& edge.from == orders_node.id
&& edge.to == output_node.id
});
assert!(
orders_dependency.is_some(),
"joined table should still connect to output via join dependency"
);
}
#[test]
fn select_literal_keeps_base_table_connected_to_output() {
let sql = r#"
SELECT 1
FROM users
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let users_node = find_table_node(&stmt, "users").expect("users not found");
let literal_col = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Column)
.expect("literal output column not found");
let dependency = stmt
.edges
.iter()
.find(|edge| edge.from == users_node.id && edge.to == literal_col.id);
assert!(
dependency.is_some(),
"base table should connect to literal output column"
);
}
#[test]
fn count_star_self_join_creates_multiple_dependencies() {
let sql = r#"
SELECT COUNT(*)
FROM employees e1
LEFT JOIN employees e2 ON e1.manager_id = e2.id
LEFT JOIN employees e3 ON e2.manager_id = e3.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("Output node should exist");
let joined_alias_ids: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::JoinDependency && e.join_type == Some(JoinType::Left))
.map(|e| e.from.clone())
.collect();
let joined_aliases: Vec<_> = stmt
.nodes
.iter()
.filter(|node| node.node_type == NodeType::Table && joined_alias_ids.contains(&node.id))
.collect();
assert_eq!(
joined_aliases.len(),
2,
"expected two joined employee aliases in self-join aggregate query"
);
for alias_node in joined_aliases {
let join_dependency = stmt.edges.iter().find(|edge| {
edge.edge_type == EdgeType::JoinDependency
&& edge.from == alias_node.id
&& edge.to == output_node.id
});
assert!(
join_dependency.is_some(),
"joined self-join alias {} should connect to output for COUNT(*) queries",
alias_node.id
);
}
}
#[test]
fn join_only_tables_emit_output_dependency_for_distinct_projection() {
let sql = r#"
SELECT DISTINCT u.id
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("Output node should exist");
let orders_node = find_table_node(&stmt, "orders").expect("orders not found");
let join_dependency = stmt.edges.iter().find(|edge| {
edge.edge_type == EdgeType::JoinDependency
&& edge.from == orders_node.id
&& edge.to == output_node.id
});
assert!(
join_dependency.is_some(),
"join-only table should connect to output for DISTINCT projections"
);
}
#[test]
fn join_only_tables_emit_output_dependency_for_literal_projection() {
let sql = r#"
SELECT 1
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("Output node should exist");
let orders_node = find_table_node(&stmt, "orders").expect("orders not found");
let join_dependency = stmt.edges.iter().find(|edge| {
edge.edge_type == EdgeType::JoinDependency
&& edge.from == orders_node.id
&& edge.to == output_node.id
});
assert!(
join_dependency.is_some(),
"join-only table should connect to output for literal projections"
);
}
#[test]
fn joined_tables_emit_output_dependency_when_column_lineage_disabled() {
let sql = r#"
SELECT u.id
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
"#;
let result = run_analysis_with_options(
sql,
Dialect::Generic,
None,
AnalysisOptions {
enable_column_lineage: Some(false),
..Default::default()
},
);
let stmt = first_statement(&result);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("output node should exist");
let orders_node = find_table_node(&stmt, "orders").expect("orders not found");
let join_dependency = stmt.edges.iter().find(|edge| {
edge.edge_type == EdgeType::JoinDependency
&& edge.from == orders_node.id
&& edge.to == output_node.id
});
let join_dependency = join_dependency
.expect("joined table should still connect to the output when column lineage is disabled");
assert_eq!(join_dependency.join_type, Some(JoinType::Left));
assert_eq!(
join_dependency.join_condition.as_deref(),
Some("u.id = o.user_id")
);
}
#[test]
fn wildcard_join_contributors_do_not_emit_output_dependency_without_schema() {
let sql = r#"
SELECT *
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let orders_node = find_table_node(&stmt, "orders").expect("orders not found");
let join_dependency = stmt
.edges
.iter()
.find(|edge| edge.edge_type == EdgeType::JoinDependency && edge.from == orders_node.id);
assert!(
join_dependency.is_none(),
"joined table should not emit join dependency when SELECT * creates a direct output edge"
);
}
#[test]
fn qualified_wildcard_join_only_table_gets_dependency() {
let sql = r#"
SELECT u.*
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let output_node = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Output)
.expect("Output node should exist");
let orders_node = find_table_node(&stmt, "orders").expect("orders not found");
let join_dependency = stmt.edges.iter().find(|edge| {
edge.edge_type == EdgeType::JoinDependency
&& edge.from == orders_node.id
&& edge.to == output_node.id
});
assert!(
join_dependency.is_some(),
"join-only table must get JoinDependency when only other table's wildcard is selected"
);
}
#[test]
fn column_level_edges_from_joined_tables_carry_join_info() {
let sql = r#"
SELECT o.order_id, c.name
FROM orders o
INNER JOIN customers c ON o.customer_id = c.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let customers_node = find_table_node(&stmt, "customers").expect("customers not found");
let customer_column_ids: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::Ownership && e.from == customers_node.id)
.map(|e| e.to.clone())
.collect();
assert!(
!customer_column_ids.is_empty(),
"customers should own columns"
);
let has_join_info = stmt.edges.iter().any(|edge| {
matches!(edge.edge_type, EdgeType::DataFlow | EdgeType::Derivation)
&& customer_column_ids.contains(&edge.from)
&& edge.join_type == Some(JoinType::Inner)
});
assert!(
has_join_info,
"column-level edges from joined table should carry join_type after propagation"
);
}
#[test]
fn propagated_join_info_does_not_overwrite_existing() {
let sql = r#"
SELECT u.id
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let orders_node = find_table_node(&stmt, "orders").expect("orders not found");
let join_dep = stmt
.edges
.iter()
.find(|e| e.edge_type == EdgeType::JoinDependency && e.from == orders_node.id)
.expect("JoinDependency edge should exist for join-only table");
assert_eq!(
join_dep.join_type,
Some(JoinType::Left),
"JoinDependency edge should retain its original join_type"
);
assert_eq!(
join_dep.join_condition.as_deref(),
Some("u.id = o.user_id"),
"JoinDependency edge should retain its original join_condition"
);
}
#[test]
fn where_filters_attached_to_correct_tables() {
let sql = r#"
SELECT o.order_id, c.customer_name
FROM orders o
INNER JOIN customers c ON o.customer_id = c.id
WHERE o.order_date >= '2024-01-01'
AND c.status = 'active'
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let orders_node = find_table_node(&stmt, "orders").expect("orders table not found");
let customers_node = find_table_node(&stmt, "customers").expect("customers table not found");
eprintln!("orders filters: {:?}", orders_node.filters);
eprintln!("customers filters: {:?}", customers_node.filters);
assert_eq!(
orders_node.filters.len(),
1,
"orders should have exactly one filter"
);
assert!(
orders_node.filters[0].expression.contains("order_date"),
"orders filter should mention order_date"
);
assert!(
!orders_node.filters[0].expression.contains("status"),
"orders filter should NOT mention status (that belongs to customers)"
);
assert_eq!(
customers_node.filters.len(),
1,
"customers should have exactly one filter"
);
assert!(
customers_node.filters[0].expression.contains("status"),
"customers filter should mention status"
);
assert!(
!customers_node.filters[0].expression.contains("order_date"),
"customers filter should NOT mention order_date (that belongs to orders)"
);
for filter in &orders_node.filters {
assert_eq!(filter.clause_type, FilterClauseType::Where);
}
for filter in &customers_node.filters {
assert_eq!(filter.clause_type, FilterClauseType::Where);
}
}
#[test]
fn having_filters_attached_correctly() {
let sql = r#"
SELECT
c.category,
SUM(p.price) as total_price,
COUNT(*) as product_count
FROM products p
JOIN categories c ON p.category_id = c.id
WHERE p.active = true
GROUP BY c.category
HAVING SUM(p.price) > 1000 AND COUNT(*) > 5
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let products_node = find_table_node(&stmt, "products").expect("products table not found");
let categories_node = find_table_node(&stmt, "categories").expect("categories table not found");
eprintln!("products filters: {:?}", products_node.filters);
eprintln!("categories filters: {:?}", categories_node.filters);
let products_where_filters: Vec<_> = products_node
.filters
.iter()
.filter(|f| f.clause_type == FilterClauseType::Where)
.collect();
assert_eq!(
products_where_filters.len(),
1,
"products should have one WHERE filter"
);
assert!(
products_where_filters[0].expression.contains("active"),
"products WHERE filter should mention 'active'"
);
let all_having_filters: Vec<_> = stmt
.nodes
.iter()
.flat_map(|n| &n.filters)
.filter(|f| f.clause_type == FilterClauseType::Having)
.collect();
assert!(
!all_having_filters.is_empty() || products_node.filters.len() > 1,
"HAVING filters should be captured"
);
}
#[test]
fn nested_or_predicates_not_split() {
let sql = r#"
SELECT * FROM users
WHERE (status = 'active' OR status = 'pending') AND age > 18
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let users_node = find_table_node(&stmt, "users").expect("users table not found");
eprintln!("users filters: {:?}", users_node.filters);
assert_eq!(
users_node.filters.len(),
2,
"Should split by top-level AND only, keeping OR grouped"
);
let has_or_filter = users_node
.filters
.iter()
.any(|f| f.expression.contains("OR") || f.expression.contains("pending"));
assert!(has_or_filter, "One filter should contain the OR expression");
}
#[test]
fn cross_table_predicate_not_attached_to_individual_tables() {
let sql = r#"
SELECT a.name, b.amount
FROM users a
JOIN orders b ON a.id = b.user_id
WHERE a.status = 'active' AND a.id = b.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let users_node = find_table_node(&stmt, "users").expect("users table not found");
let orders_node = find_table_node(&stmt, "orders").expect("orders table not found");
eprintln!("users filters: {:?}", users_node.filters);
eprintln!("orders filters: {:?}", orders_node.filters);
assert_eq!(
users_node.filters.len(),
1,
"users should have exactly one filter (the single-table predicate)"
);
assert!(
users_node.filters[0].expression.contains("status"),
"users filter should be the status predicate"
);
assert!(
orders_node.filters.is_empty(),
"orders should have no filters (cross-table predicate should be skipped), got: {:?}",
orders_node.filters
);
}
#[test]
fn same_table_qualified_and_unqualified_refs_are_not_treated_as_cross_table() {
let sql = r#"
SELECT u.id
FROM users u
WHERE u.id = id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let users_node = find_table_node(&stmt, "users").expect("users table not found");
assert_eq!(
users_node.filters.len(),
1,
"same-table qualified + unqualified predicate should be captured once"
);
assert!(
users_node.filters[0].expression.contains("u.id = id"),
"users filter should preserve the mixed predicate, got {:?}",
users_node.filters
);
}
#[test]
fn unresolvable_predicate_not_broadcast_to_all_tables() {
let sql = r#"
SELECT u.name, o.amount
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.status = 'active' AND random() > 0.5
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let users_node = find_table_node(&stmt, "users").expect("users table not found");
let orders_node = find_table_node(&stmt, "orders").expect("orders table not found");
eprintln!("users filters: {:?}", users_node.filters);
eprintln!("orders filters: {:?}", orders_node.filters);
assert_eq!(
users_node.filters.len(),
1,
"users should have exactly one filter"
);
assert!(
users_node.filters[0].expression.contains("status"),
"users filter should be the status predicate"
);
assert!(
orders_node.filters.is_empty(),
"orders should have no filters (unresolvable predicate should not broadcast), got: {:?}",
orders_node.filters
);
}
#[test]
fn multiple_join_types_captured() {
let sql = r#"
SELECT *
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.id
INNER JOIN products p ON o.product_id = p.id
FULL OUTER JOIN inventory i ON p.id = i.product_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let _orders_node = find_table_node(&stmt, "orders").expect("orders table not found");
let customers_node = find_table_node(&stmt, "customers").expect("customers table not found");
let products_node = find_table_node(&stmt, "products").expect("products table not found");
let inventory_node = find_table_node(&stmt, "inventory").expect("inventory table not found");
use flowscope_core::JoinType;
let find_join_edge = |node_id: &str| -> Option<&flowscope_core::Edge> {
stmt.edges
.iter()
.copied()
.find(|e| e.from.as_ref() == node_id && e.join_type.is_some())
};
let customers_edge =
find_join_edge(customers_node.id.as_ref()).expect("customers should have a join edge");
assert_eq!(
customers_edge.join_type,
Some(JoinType::Left),
"customers edge should be LEFT joined"
);
assert!(
customers_edge.join_condition.is_some(),
"customers edge should have join condition"
);
let products_edge =
find_join_edge(products_node.id.as_ref()).expect("products should have a join edge");
assert_eq!(
products_edge.join_type,
Some(JoinType::Inner),
"products edge should be INNER joined"
);
assert!(
products_edge.join_condition.is_some(),
"products edge should have join condition"
);
let inventory_edge =
find_join_edge(inventory_node.id.as_ref()).expect("inventory should have a join edge");
assert_eq!(
inventory_edge.join_type,
Some(JoinType::Full),
"inventory edge should be FULL joined"
);
assert!(
inventory_edge.join_condition.is_some(),
"inventory edge should have join condition"
);
}
#[test]
fn cte_join_metadata_captured() {
let sql = r#"
WITH user_ltv AS (
SELECT user_id, COUNT(*) AS total_orders
FROM orders
GROUP BY user_id
)
SELECT *
FROM users u
LEFT JOIN user_ltv ltv ON u.user_id = ltv.user_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let cte_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Cte && n.qualified_name.as_deref() == Some("user_ltv"))
.collect();
assert!(!cte_nodes.is_empty(), "user_ltv CTE nodes not found");
use flowscope_core::JoinType;
let cte_node_ids: Vec<_> = cte_nodes.iter().map(|n| n.id.as_ref()).collect();
let cte_col_ids: Vec<_> = stmt
.edges
.iter()
.filter(|e| {
e.edge_type == flowscope_core::EdgeType::Ownership
&& cte_node_ids.contains(&e.from.as_ref())
})
.map(|e| e.to.clone())
.collect();
let cte_related_ids: Vec<&str> = cte_node_ids
.iter()
.copied()
.chain(cte_col_ids.iter().map(|s| s.as_ref()))
.collect();
let join_edge = stmt
.edges
.iter()
.find(|e| cte_related_ids.contains(&e.from.as_ref()) && e.join_type == Some(JoinType::Left))
.expect("CTE should have an edge with LEFT join metadata");
assert_eq!(
join_edge.join_condition.as_deref(),
Some("u.user_id = ltv.user_id"),
"CTE edge should capture join condition"
);
}
#[test]
fn deeply_nested_and_predicates_split_correctly() {
let sql = r#"
SELECT * FROM users
WHERE a = 1 AND b = 2 AND c = 3 AND d = 4
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let users_node = find_table_node(&stmt, "users").expect("users table not found");
eprintln!("users filters: {:?}", users_node.filters);
assert_eq!(
users_node.filters.len(),
4,
"Should split into 4 separate predicates"
);
}
#[test]
fn aggregation_detects_grouping_key() {
let sql = r#"
SELECT region, SUM(amount) AS total
FROM orders
GROUP BY region;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let region_col = find_column_node(&stmt, "region").expect("region column not found");
assert!(
region_col.aggregation.is_some(),
"region column should have aggregation info"
);
let agg = region_col.aggregation.as_ref().unwrap();
assert!(
agg.is_grouping_key,
"region should be marked as grouping key"
);
assert!(
agg.function.is_none(),
"grouping key should not have function"
);
}
#[test]
fn aggregation_detects_aggregate_function() {
let sql = r#"
SELECT region, SUM(amount) AS total
FROM orders
GROUP BY region;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let total_col = find_column_node(&stmt, "total").expect("total column not found");
assert!(
total_col.aggregation.is_some(),
"total column should have aggregation info"
);
let agg = total_col.aggregation.as_ref().unwrap();
assert!(
!agg.is_grouping_key,
"total should not be marked as grouping key"
);
assert_eq!(
agg.function.as_deref(),
Some("SUM"),
"should detect SUM function"
);
}
#[test]
fn aggregation_detects_distinct() {
let sql = r#"
SELECT region, COUNT(DISTINCT user_id) AS unique_users
FROM orders
GROUP BY region;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let unique_users_col =
find_column_node(&stmt, "unique_users").expect("unique_users column not found");
assert!(
unique_users_col.aggregation.is_some(),
"unique_users column should have aggregation info"
);
let agg = unique_users_col.aggregation.as_ref().unwrap();
assert_eq!(
agg.function.as_deref(),
Some("COUNT"),
"should detect COUNT function"
);
assert_eq!(agg.distinct, Some(true), "should detect DISTINCT modifier");
}
#[test]
fn aggregation_no_info_without_group_by() {
let sql = r#"
SELECT region, amount
FROM orders;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let region_col = find_column_node(&stmt, "region").expect("region column not found");
let amount_col = find_column_node(&stmt, "amount").expect("amount column not found");
assert!(
region_col.aggregation.is_none(),
"region should not have aggregation info without GROUP BY"
);
assert!(
amount_col.aggregation.is_none(),
"amount should not have aggregation info without GROUP BY"
);
}
#[test]
fn aggregation_multiple_grouping_keys() {
let sql = r#"
SELECT region, product_type, AVG(price) AS avg_price
FROM products
GROUP BY region, product_type;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let region_col = find_column_node(&stmt, "region").expect("region column not found");
let product_type_col =
find_column_node(&stmt, "product_type").expect("product_type column not found");
let avg_price_col = find_column_node(&stmt, "avg_price").expect("avg_price column not found");
assert!(
region_col
.aggregation
.as_ref()
.map(|a| a.is_grouping_key)
.unwrap_or(false),
"region should be grouping key"
);
assert!(
product_type_col
.aggregation
.as_ref()
.map(|a| a.is_grouping_key)
.unwrap_or(false),
"product_type should be grouping key"
);
assert_eq!(
avg_price_col
.aggregation
.as_ref()
.and_then(|a| a.function.as_deref()),
Some("AVG"),
"avg_price should have AVG function"
);
}
#[test]
fn aggregation_nested_in_expression() {
let sql = r#"
SELECT region, SUM(amount) * 1.1 AS total_with_tax
FROM orders
GROUP BY region;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let total_col =
find_column_node(&stmt, "total_with_tax").expect("total_with_tax column not found");
assert!(
total_col.aggregation.is_some(),
"total_with_tax should have aggregation info"
);
let agg = total_col.aggregation.as_ref().unwrap();
assert_eq!(
agg.function.as_deref(),
Some("SUM"),
"should detect SUM in expression"
);
}
#[test]
fn aggregation_in_case_expression() {
let sql = r#"
SELECT
region,
CASE WHEN SUM(amount) > 1000 THEN 'high' ELSE 'low' END AS volume
FROM orders
GROUP BY region;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let volume_col = find_column_node(&stmt, "volume").expect("volume column not found");
assert!(
volume_col.aggregation.is_some(),
"volume should have aggregation info from CASE"
);
let agg = volume_col.aggregation.as_ref().unwrap();
assert_eq!(
agg.function.as_deref(),
Some("SUM"),
"should detect SUM in CASE expression"
);
}
#[test]
fn aggregation_qualified_column_as_grouping_key() {
let sql = r#"
SELECT o.region, SUM(o.amount) AS total
FROM orders o
GROUP BY o.region;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let region_col = find_column_node(&stmt, "region").expect("region column not found");
assert!(
region_col
.aggregation
.as_ref()
.map(|a| a.is_grouping_key)
.unwrap_or(false),
"qualified column should match grouping key"
);
}
#[test]
fn nested_derived_tables_track_full_lineage() {
let sql = r#"
SELECT outer_sub.total
FROM (
SELECT inner_sub.amount AS total
FROM (
SELECT SUM(amount) AS amount
FROM orders
) AS inner_sub
) AS outer_sub
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let inner_node = stmt
.nodes
.iter()
.find(|n| n.node_type == NodeType::Cte && &*n.label == "inner_sub");
let outer_node = stmt
.nodes
.iter()
.find(|n| n.node_type == NodeType::Cte && &*n.label == "outer_sub");
assert!(
inner_node.is_some(),
"inner derived table node should exist"
);
assert!(
outer_node.is_some(),
"outer derived table node should exist"
);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"base table should be tracked through nested derived tables"
);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
!derivations.is_empty(),
"nested derived tables should produce derivation edges"
);
}
#[test]
fn cte_referencing_another_cte() {
let sql = r#"
WITH base_orders AS (
SELECT user_id, SUM(amount) AS total_amount
FROM orders
GROUP BY user_id
),
enriched_orders AS (
SELECT bo.user_id, bo.total_amount, u.name
FROM base_orders bo
JOIN users u ON bo.user_id = u.id
)
SELECT user_id, name, total_amount
FROM enriched_orders
WHERE total_amount > 100
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let base_cte = stmt
.nodes
.iter()
.find(|n| n.node_type == NodeType::Cte && &*n.label == "base_orders");
let enriched_cte = stmt
.nodes
.iter()
.find(|n| n.node_type == NodeType::Cte && &*n.label == "enriched_orders");
assert!(base_cte.is_some(), "base_orders CTE should exist");
assert!(enriched_cte.is_some(), "enriched_orders CTE should exist");
let tables = collect_table_names(&result);
assert!(tables.contains("orders"), "orders table should be tracked");
assert!(tables.contains("users"), "users table should be tracked");
let ctes = collect_cte_names(&result);
assert!(ctes.contains("base_orders"), "base_orders CTE should exist");
assert!(
ctes.contains("enriched_orders"),
"enriched_orders CTE should exist"
);
}
#[test]
fn alias_shadows_table_name() {
let sql = r#"
SELECT orders.amount
FROM payments AS orders
"#;
let schema = SchemaMetadata {
allow_implied: false,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![
schema_table(None, None, "orders", &["id", "amount"]),
schema_table(None, None, "payments", &["id", "amount"]),
],
};
let result = run_analysis(sql, Dialect::Generic, Some(schema));
let tables = collect_table_names(&result);
assert!(
tables.contains("payments"),
"payments table should be tracked (aliased as orders)"
);
assert!(
!tables.contains("orders"),
"real orders table should not be tracked when alias shadows it"
);
}
#[test]
fn deeply_nested_cte_chain() {
let sql = r#"
WITH step1 AS (
SELECT id, amount FROM orders
),
step2 AS (
SELECT id, amount * 1.1 AS adjusted FROM step1
),
step3 AS (
SELECT id, adjusted * 0.9 AS final_amount FROM step2
)
SELECT id, final_amount FROM step3
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let ctes = collect_cte_names(&result);
assert!(ctes.contains("step1"), "step1 CTE should exist");
assert!(ctes.contains("step2"), "step2 CTE should exist");
assert!(ctes.contains("step3"), "step3 CTE should exist");
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"orders table should be tracked through CTE chain"
);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivations.len() >= 2,
"CTE chain should produce multiple derivation edges for transformations"
);
}
#[test]
fn derived_table_inside_cte() {
let sql = r#"
WITH summary AS (
SELECT sub.user_id, sub.order_count
FROM (
SELECT user_id, COUNT(*) AS order_count
FROM orders
GROUP BY user_id
) AS sub
WHERE sub.order_count > 5
)
SELECT * FROM summary
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let ctes = collect_cte_names(&result);
assert!(ctes.contains("summary"), "summary CTE should exist");
let derived_node = stmt
.nodes
.iter()
.find(|n| n.node_type == NodeType::Cte && &*n.label == "sub");
assert!(
derived_node.is_some(),
"derived table inside CTE should exist as node"
);
let tables = collect_table_names(&result);
assert!(
tables.contains("orders"),
"orders table should be tracked through CTE with nested derived table"
);
}
#[test]
fn case_when_simple_expression() {
let sql = r#"
SELECT
id,
CASE
WHEN status = 'active' THEN 'Active'
WHEN status = 'pending' THEN 'Pending'
WHEN status = 'inactive' THEN 'Inactive'
ELSE 'Unknown'
END AS status_label
FROM users
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(!result.summary.has_errors, "should parse without errors");
let stmt = first_statement(&result);
let status_col = find_column_node(&stmt, "status_label");
assert!(
status_col.is_some(),
"CASE expression should produce status_label column"
);
let tables = collect_table_names(&result);
assert!(tables.contains("users"), "users table should be tracked");
}
#[test]
fn case_when_searched_form() {
let sql = r#"
SELECT
order_id,
CASE
WHEN amount > 1000 THEN 'large'
WHEN amount > 100 THEN 'medium'
ELSE 'small'
END AS size_category,
CASE status
WHEN 'A' THEN 'Active'
WHEN 'P' THEN 'Pending'
ELSE 'Other'
END AS status_name
FROM orders
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(!result.summary.has_errors, "should parse without errors");
let stmt = first_statement(&result);
let size_col = find_column_node(&stmt, "size_category");
let status_col = find_column_node(&stmt, "status_name");
assert!(size_col.is_some(), "searched CASE should work");
assert!(status_col.is_some(), "simple CASE should work");
}
#[test]
fn case_when_nested_in_function() {
let sql = r#"
SELECT
COALESCE(
CASE WHEN active THEN name ELSE NULL END,
'default'
) AS display_name
FROM users
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
!result.summary.has_errors,
"nested CASE in function should parse"
);
let stmt = first_statement(&result);
let col = find_column_node(&stmt, "display_name");
assert!(col.is_some(), "nested CASE should produce output column");
}
#[test]
fn case_when_with_subquery() {
let sql = r#"
SELECT
u.id,
CASE
WHEN u.id IN (SELECT user_id FROM premium_users) THEN 'premium'
ELSE 'standard'
END AS tier
FROM users u
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
!result.summary.has_errors,
"CASE with subquery should parse"
);
let tables = collect_table_names(&result);
assert!(tables.contains("users"), "main table tracked");
let stmt = first_statement(&result);
let tier_col = find_column_node(&stmt, "tier");
assert!(
tier_col.is_some(),
"CASE with subquery produces tier column"
);
}
#[test]
fn left_semi_join_tracks_tables() {
let sql = r#"
SELECT o.order_id, o.total
FROM orders o
LEFT SEMI JOIN customers c ON o.customer_id = c.id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
if !result.summary.has_errors {
let stmt = first_statement(&result);
let tables = collect_table_names(&result);
assert!(tables.contains("orders"), "orders table should be tracked");
assert!(
tables.contains("customers"),
"customers table should be tracked"
);
let orders_node = find_table_node(&stmt, "orders");
let customers_node = find_table_node(&stmt, "customers");
assert!(orders_node.is_some(), "orders node exists");
assert!(customers_node.is_some(), "customers node exists");
if let Some(cust) = customers_node {
let join_edge = stmt
.edges
.iter()
.find(|e| e.from == cust.id && e.join_type.is_some());
assert_eq!(
join_edge.and_then(|e| e.join_type),
Some(JoinType::LeftSemi),
"should recognize LEFT SEMI JOIN on edge"
);
}
}
}
#[test]
fn left_anti_join_tracks_tables() {
let sql = r#"
SELECT o.order_id, o.total
FROM orders o
LEFT ANTI JOIN returns r ON o.order_id = r.order_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
if !result.summary.has_errors {
let stmt = first_statement(&result);
let tables = collect_table_names(&result);
assert!(tables.contains("orders"), "orders tracked");
assert!(tables.contains("returns"), "returns tracked");
let returns_node = find_table_node(&stmt, "returns");
if let Some(ret) = returns_node {
let join_edge = stmt
.edges
.iter()
.find(|e| e.from == ret.id && e.join_type.is_some());
assert_eq!(
join_edge.and_then(|e| e.join_type),
Some(JoinType::LeftAnti),
"should recognize LEFT ANTI JOIN on edge"
);
}
}
}
#[test]
fn straight_join_mysql_syntax() {
let sql = r#"
SELECT o.id, c.name
FROM orders o
STRAIGHT_JOIN customers c ON o.customer_id = c.id
"#;
let result = run_analysis(sql, Dialect::Mysql, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(tables.contains("orders"), "orders tracked");
assert!(tables.contains("customers"), "customers tracked");
let stmt = first_statement(&result);
let cust_node = find_table_node(&stmt, "customers");
if let Some(cust) = cust_node {
let owned_col_ids: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::Ownership && e.from == cust.id)
.map(|e| e.to.clone())
.collect();
let join_edge = stmt.edges.iter().find(|e| {
e.join_type.is_some()
&& (e.from == cust.id || owned_col_ids.iter().any(|c| c == &e.from))
});
assert_eq!(
join_edge.and_then(|e| e.join_type),
Some(JoinType::Inner),
"STRAIGHT_JOIN should be treated as Inner join on edge"
);
}
}
}
#[test]
fn cross_apply_sql_server_syntax() {
let sql = r#"
SELECT e.name, d.dept_name
FROM employees e
CROSS APPLY (
SELECT dept_name FROM departments WHERE dept_id = e.dept_id
) d
"#;
let result = run_analysis(sql, Dialect::Mssql, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(tables.contains("employees"), "employees tracked");
assert!(
tables.contains("departments"),
"departments tracked in CROSS APPLY"
);
}
}
#[test]
fn outer_apply_sql_server_syntax() {
let sql = r#"
SELECT e.name, d.dept_name
FROM employees e
OUTER APPLY (
SELECT TOP 1 dept_name FROM departments WHERE dept_id = e.dept_id
) d
"#;
let result = run_analysis(sql, Dialect::Mssql, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(tables.contains("employees"), "employees tracked");
}
}
#[test]
fn minus_set_operator_oracle_syntax() {
let sql = r#"
SELECT customer_id FROM all_customers
MINUS
SELECT customer_id FROM inactive_customers
"#;
let result = run_analysis(sql, Dialect::Generic, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(
tables.contains("all_customers"),
"all_customers tracked in MINUS"
);
assert!(
tables.contains("inactive_customers"),
"inactive_customers tracked in MINUS"
);
}
}
#[test]
fn minus_with_multiple_columns() {
let sql = r#"
SELECT id, name, email FROM users_2024
MINUS
SELECT id, name, email FROM users_2023
"#;
let result = run_analysis(sql, Dialect::Generic, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(tables.contains("users_2024"), "users_2024 tracked");
assert!(tables.contains("users_2023"), "users_2023 tracked");
}
}
#[test]
fn update_from_single_source_table() {
let sql = r#"
UPDATE target_table t
SET t.value = s.new_value
FROM source_table s
WHERE t.id = s.id
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
assert!(!result.summary.has_errors, "UPDATE FROM should parse");
let tables = collect_table_names(&result);
assert!(tables.contains("target_table"), "target table tracked");
assert!(
tables.contains("source_table"),
"source table in FROM tracked"
);
}
#[test]
fn update_from_multiple_source_tables() {
let sql = r#"
UPDATE orders o
SET o.status = 'shipped',
o.shipping_date = s.ship_date
FROM shipments s
JOIN carriers c ON s.carrier_id = c.id
WHERE o.id = s.order_id
AND c.active = true
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(tables.contains("orders"), "target table tracked");
assert!(tables.contains("shipments"), "shipments source tracked");
assert!(tables.contains("carriers"), "carriers source tracked");
}
}
#[test]
fn update_from_with_subquery() {
let sql = r#"
UPDATE products p
SET p.avg_rating = agg.rating
FROM (
SELECT product_id, AVG(rating) as rating
FROM reviews
GROUP BY product_id
) agg
WHERE p.id = agg.product_id
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(tables.contains("products"), "target table tracked");
assert!(tables.contains("reviews"), "subquery source tracked");
}
}
#[test]
fn openjson_table_factor_sql_server() {
let sql = r#"
SELECT j.id, j.name
FROM documents d
CROSS APPLY OPENJSON(d.json_data)
WITH (id INT, name VARCHAR(100)) AS j
"#;
let result = run_analysis(sql, Dialect::Mssql, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(tables.contains("documents"), "documents table tracked");
}
}
#[test]
fn xmltable_factor_oracle_postgres() {
let sql = r#"
SELECT x.id, x.value
FROM xml_data d,
XMLTABLE('/root/item'
PASSING d.xml_content
COLUMNS
id INT PATH '@id',
value VARCHAR(100) PATH 'text()'
) AS x
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(tables.contains("xml_data"), "xml_data table tracked");
}
}
#[test]
fn lateral_derived_table() {
let sql = r#"
SELECT e.name, d.dept_name
FROM employees e,
LATERAL (
SELECT dept_name
FROM departments
WHERE dept_id = e.dept_id
LIMIT 1
) AS d
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(tables.contains("employees"), "employees tracked");
assert!(
tables.contains("departments"),
"lateral subquery table tracked"
);
}
}
#[test]
fn unnest_table_factor() {
let sql = r#"
SELECT u.name, tag
FROM users u
CROSS JOIN UNNEST(u.tags) AS tag
"#;
let result = run_analysis(sql, Dialect::Bigquery, None);
if !result.summary.has_errors {
let tables = collect_table_names(&result);
assert!(tables.contains("users"), "users table tracked");
}
}
#[test]
fn table_function_call() {
let sql = r#"
SELECT *
FROM generate_series(1, 10) AS nums(n)
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
assert!(
!result.summary.has_errors || result.issues.iter().any(|i| i.code.contains("PARSE")),
"should either parse or report parse error"
);
}
#[test]
fn deeply_nested_case_expressions() {
let sql = r#"
SELECT
CASE WHEN a = 1 THEN
CASE WHEN b = 2 THEN
CASE WHEN c = 3 THEN 'deep'
ELSE 'c_fail'
END
ELSE 'b_fail'
END
ELSE 'a_fail'
END AS nested_result
FROM test_table
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(!result.summary.has_errors, "nested CASE should parse");
let stmt = first_statement(&result);
let col = find_column_node(&stmt, "nested_result");
assert!(col.is_some(), "nested CASE produces column");
}
#[test]
fn case_with_aggregate_in_condition() {
let sql = r#"
SELECT
customer_id,
CASE
WHEN COUNT(*) > 10 THEN 'frequent'
WHEN COUNT(*) > 5 THEN 'regular'
ELSE 'occasional'
END AS customer_type
FROM orders
GROUP BY customer_id
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
!result.summary.has_errors,
"CASE with aggregates should parse"
);
let stmt = first_statement(&result);
let col = find_column_node(&stmt, "customer_type");
assert!(col.is_some(), "CASE with aggregate produces column");
}
#[test]
fn multiple_join_types_in_single_query() {
let sql = r#"
SELECT o.id, c.name, p.product_name, s.status
FROM orders o
INNER JOIN customers c ON o.customer_id = c.id
LEFT JOIN products p ON o.product_id = p.id
RIGHT JOIN order_status s ON o.status_id = s.id
CROSS JOIN config cfg
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(!result.summary.has_errors, "mixed joins should parse");
let tables = collect_table_names(&result);
assert!(tables.contains("orders"), "orders tracked");
assert!(tables.contains("customers"), "customers tracked");
assert!(tables.contains("products"), "products tracked");
assert!(tables.contains("order_status"), "order_status tracked");
assert!(tables.contains("config"), "config tracked");
let stmt = first_statement(&result);
let find_join_edge = |node_id: &str| -> Option<JoinType> {
let owned_col_ids: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::Ownership && e.from.as_ref() == node_id)
.map(|e| e.to.clone())
.collect();
stmt.edges
.iter()
.find(|e| {
e.join_type.is_some()
&& (e.from.as_ref() == node_id || owned_col_ids.iter().any(|c| c == &e.from))
})
.and_then(|e| e.join_type)
};
if let Some(cust) = find_table_node(&stmt, "customers") {
assert_eq!(
find_join_edge(cust.id.as_ref()),
Some(JoinType::Inner),
"INNER JOIN detected on edge"
);
}
if let Some(prod) = find_table_node(&stmt, "products") {
assert_eq!(
find_join_edge(prod.id.as_ref()),
Some(JoinType::Left),
"LEFT JOIN detected on edge"
);
}
if let Some(status) = find_table_node(&stmt, "order_status") {
assert_eq!(
find_join_edge(status.id.as_ref()),
Some(JoinType::Right),
"RIGHT JOIN detected on edge"
);
}
if let Some(cfg) = find_table_node(&stmt, "config") {
assert_eq!(
find_join_edge(cfg.id.as_ref()),
Some(JoinType::Cross),
"CROSS JOIN detected on edge"
);
}
}
#[test]
fn set_operations_with_all_operators() {
let sql = r#"
SELECT id FROM table_a
UNION
SELECT id FROM table_b
INTERSECT
SELECT id FROM table_c
EXCEPT
SELECT id FROM table_d
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(!result.summary.has_errors, "set operations should parse");
let tables = collect_table_names(&result);
assert!(tables.contains("table_a"), "table_a tracked");
assert!(tables.contains("table_b"), "table_b tracked");
assert!(tables.contains("table_c"), "table_c tracked");
assert!(tables.contains("table_d"), "table_d tracked");
}
#[test]
fn create_table_with_inline_primary_key() {
let sql = r#"
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT
);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
!result.summary.has_errors,
"CREATE TABLE with PK should parse"
);
let resolved_schema = result
.resolved_schema
.as_ref()
.expect("should have resolved schema");
let users_table = resolved_schema
.tables
.iter()
.find(|t| t.name == "users")
.expect("users table should exist");
let id_col = users_table
.columns
.iter()
.find(|c| c.name == "id")
.expect("id column should exist");
assert_eq!(
id_col.is_primary_key,
Some(true),
"id should be marked as PK"
);
let name_col = users_table
.columns
.iter()
.find(|c| c.name == "name")
.expect("name column should exist");
assert_eq!(name_col.is_primary_key, None, "name should not be PK");
}
#[test]
fn create_table_with_table_level_primary_key() {
let sql = r#"
CREATE TABLE orders (
order_id INTEGER,
customer_id INTEGER,
total DECIMAL,
PRIMARY KEY (order_id)
);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
!result.summary.has_errors,
"CREATE TABLE with table-level PK should parse"
);
let resolved_schema = result
.resolved_schema
.as_ref()
.expect("should have resolved schema");
let orders_table = resolved_schema
.tables
.iter()
.find(|t| t.name == "orders")
.expect("orders table should exist");
let order_id_col = orders_table
.columns
.iter()
.find(|c| c.name == "order_id")
.expect("order_id column should exist");
assert_eq!(
order_id_col.is_primary_key,
Some(true),
"order_id should be marked as PK"
);
assert!(
!orders_table.constraints.is_empty(),
"should have table constraints"
);
let pk_constraint = orders_table
.constraints
.iter()
.find(|c| matches!(c.constraint_type, ConstraintType::PrimaryKey))
.expect("PK constraint should exist");
assert_eq!(pk_constraint.columns, vec!["order_id"]);
}
#[test]
fn create_table_with_inline_foreign_key() {
let sql = r#"
CREATE TABLE order_items (
id INTEGER PRIMARY KEY,
order_id INTEGER REFERENCES orders(id),
product_name TEXT
);
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
assert!(
!result.summary.has_errors,
"CREATE TABLE with FK should parse"
);
let resolved_schema = result
.resolved_schema
.as_ref()
.expect("should have resolved schema");
let items_table = resolved_schema
.tables
.iter()
.find(|t| t.name == "order_items")
.expect("order_items table should exist");
let order_id_col = items_table
.columns
.iter()
.find(|c| c.name == "order_id")
.expect("order_id column should exist");
let fk_ref = order_id_col
.foreign_key
.as_ref()
.expect("should have FK reference");
assert_eq!(fk_ref.table, "orders");
assert_eq!(fk_ref.column, "id");
}
#[test]
fn create_table_with_table_level_foreign_key() {
let sql = r#"
CREATE TABLE order_items (
id INTEGER PRIMARY KEY,
order_id INTEGER,
product_id INTEGER,
quantity INTEGER,
FOREIGN KEY (order_id) REFERENCES orders(id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
!result.summary.has_errors,
"CREATE TABLE with table-level FK should parse"
);
let resolved_schema = result
.resolved_schema
.as_ref()
.expect("should have resolved schema");
let items_table = resolved_schema
.tables
.iter()
.find(|t| t.name == "order_items")
.expect("order_items table should exist");
let fk_constraints: Vec<_> = items_table
.constraints
.iter()
.filter(|c| matches!(c.constraint_type, ConstraintType::ForeignKey))
.collect();
assert_eq!(fk_constraints.len(), 2, "should have 2 FK constraints");
let order_fk = fk_constraints
.iter()
.find(|c| c.columns.contains(&"order_id".to_string()))
.expect("order FK should exist");
assert_eq!(order_fk.referenced_table.as_deref(), Some("orders"));
assert_eq!(
order_fk.referenced_columns.as_deref(),
Some(&["id".to_string()][..])
);
let product_fk = fk_constraints
.iter()
.find(|c| c.columns.contains(&"product_id".to_string()))
.expect("product FK should exist");
assert_eq!(product_fk.referenced_table.as_deref(), Some("products"));
}
#[test]
fn create_table_with_composite_primary_key() {
let sql = r#"
CREATE TABLE order_line_items (
order_id INTEGER,
line_number INTEGER,
product_id INTEGER,
quantity INTEGER,
PRIMARY KEY (order_id, line_number)
);
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
!result.summary.has_errors,
"CREATE TABLE with composite PK should parse"
);
let resolved_schema = result
.resolved_schema
.as_ref()
.expect("should have resolved schema");
let items_table = resolved_schema
.tables
.iter()
.find(|t| t.name == "order_line_items")
.expect("order_line_items table should exist");
let order_id_col = items_table
.columns
.iter()
.find(|c| c.name == "order_id")
.expect("order_id column should exist");
assert_eq!(
order_id_col.is_primary_key,
Some(true),
"order_id should be marked as PK"
);
let line_num_col = items_table
.columns
.iter()
.find(|c| c.name == "line_number")
.expect("line_number column should exist");
assert_eq!(
line_num_col.is_primary_key,
Some(true),
"line_number should be marked as PK"
);
let pk_constraint = items_table
.constraints
.iter()
.find(|c| matches!(c.constraint_type, ConstraintType::PrimaryKey))
.expect("PK constraint should exist");
assert_eq!(pk_constraint.columns.len(), 2);
assert!(pk_constraint.columns.contains(&"order_id".to_string()));
assert!(pk_constraint.columns.contains(&"line_number".to_string()));
}
#[test]
fn test_copy_statement_lineage() {
let sql = "COPY users FROM 's3://bucket/users.csv'";
let result = run_analysis(sql, Dialect::Generic, None);
assert!(result.issues.iter().all(|i| i.severity != Severity::Error));
let stmt = StmtView::new(&result, 0);
assert!(stmt.nodes.iter().any(|n| n.label.contains("users")));
}
#[test]
fn test_copy_into_snowflake() {
let sql = "COPY INTO analytics.orders FROM @my_stage/orders/";
let result = run_analysis(sql, Dialect::Snowflake, None);
assert!(result.issues.iter().all(|i| i.severity != Severity::Error));
let stmt = StmtView::new(&result, 0);
assert!(stmt.nodes.iter().any(|n| n.label.contains("orders")));
}
#[test]
fn test_copy_to_with_query() {
let sql = "COPY (SELECT id, name FROM users WHERE active = true) TO '/tmp/out.csv'";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
assert!(stmt.nodes.iter().any(|n| n.label.contains("users")));
}
#[test]
fn test_copy_with_column_list() {
let sql = "COPY users (id, name, email) FROM '/tmp/users.csv'";
let result = run_analysis(sql, Dialect::Postgres, None);
assert!(result.issues.iter().all(|i| i.severity != Severity::Error));
let stmt = StmtView::new(&result, 0);
assert!(
stmt.nodes.iter().any(|n| n.label.as_ref() == "users"),
"Expected 'users' table in lineage"
);
}
#[test]
fn test_copy_schema_qualified_table() {
let sql = "COPY analytics.events FROM 's3://bucket/events.csv'";
let result = run_analysis(sql, Dialect::Postgres, None);
assert!(result.issues.iter().all(|i| i.severity != Severity::Error));
let stmt = StmtView::new(&result, 0);
let table_node = stmt
.nodes
.iter()
.find(|n| n.node_type == NodeType::Table)
.expect("Expected a table node");
assert!(
table_node.qualified_name.as_ref().map(|n| n.as_ref()) == Some("analytics.events")
|| table_node.label.as_ref() == "events",
"Expected 'analytics.events' table, got: {:?}",
table_node
);
}
#[test]
fn test_unload_statement_string_query() {
let sql = r#"UNLOAD ('SELECT * FROM orders') TO 's3://bucket/out'"#;
let result = run_analysis(sql, Dialect::Redshift, None);
assert!(result.issues.iter().all(|i| i.severity != Severity::Error));
let stmt = StmtView::new(&result, 0);
assert_eq!(result.statements[0].statement_type, "UNLOAD");
assert!(
stmt.nodes.iter().any(|n| n.label.contains("orders")),
"Expected 'orders' table in lineage, got: {:?}",
stmt.nodes.iter().map(|n| &n.label).collect::<Vec<_>>()
);
}
#[test]
fn test_unload_statement_parsed_query() {
let sql = r#"UNLOAD (SELECT id, name FROM users WHERE active = true) TO 's3://bucket/out'"#;
let result = run_analysis(sql, Dialect::Redshift, None);
assert!(result.issues.iter().all(|i| i.severity != Severity::Error));
let stmt = StmtView::new(&result, 0);
assert_eq!(result.statements[0].statement_type, "UNLOAD");
assert!(
stmt.nodes.iter().any(|n| n.label.contains("users")),
"Expected 'users' table in lineage, got: {:?}",
stmt.nodes.iter().map(|n| &n.label).collect::<Vec<_>>()
);
}
#[test]
fn test_unload_statement_qualified_table() {
let sql = r#"UNLOAD ('SELECT * FROM analytics.orders WHERE order_date > ''2024-01-01''')
TO 's3://bucket/exports/orders_'
IAM_ROLE 'arn:aws:iam::123456789:role/RedshiftCopyRole'"#;
let result = run_analysis(sql, Dialect::Redshift, None);
assert!(result.issues.iter().all(|i| i.severity != Severity::Error));
let stmt = StmtView::new(&result, 0);
assert_eq!(result.statements[0].statement_type, "UNLOAD");
let table_node = stmt
.nodes
.iter()
.find(|n| n.node_type == NodeType::Table)
.expect("Should have a table node");
assert!(
table_node.qualified_name.as_ref().map(|n| n.as_ref()) == Some("analytics.orders")
|| table_node.label.as_ref() == "orders",
"Expected 'analytics.orders' table in lineage, got: {:?}",
table_node
);
}
#[test]
fn test_unload_statement_with_join() {
let sql = r#"UNLOAD ('SELECT o.id, c.name FROM orders o JOIN customers c ON o.customer_id = c.id')
TO 's3://bucket/out'"#;
let result = run_analysis(sql, Dialect::Redshift, None);
assert!(result.issues.iter().all(|i| i.severity != Severity::Error));
let stmt = StmtView::new(&result, 0);
let table_labels: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.map(|n| n.label.as_ref())
.collect();
assert!(
table_labels.contains(&"orders"),
"Expected 'orders' table, got: {:?}",
table_labels
);
assert!(
table_labels.contains(&"customers"),
"Expected 'customers' table, got: {:?}",
table_labels
);
}
#[test]
fn test_unload_statement_malformed_query_string() {
let sql = r#"UNLOAD ('SELECT * FROM WHERE invalid syntax')
TO 's3://bucket/out'"#;
let result = run_analysis(sql, Dialect::Redshift, None);
assert!(
!result.statements.is_empty(),
"Should still produce a statement even with malformed query"
);
assert!(
result
.issues
.iter()
.any(|i| i.severity == Severity::Warning && i.code == issue_codes::PARSE_ERROR),
"Expected PARSE_ERROR warning for malformed query, got: {:?}",
result.issues
);
}
#[test]
fn test_alter_table_rename() {
let sql = "ALTER TABLE old_users RENAME TO new_users";
let result = run_analysis(sql, Dialect::Generic, None);
assert!(result.issues.iter().all(|i| i.severity != Severity::Error));
let stmt = StmtView::new(&result, 0);
let labels: Vec<_> = stmt.nodes.iter().map(|n| n.label.as_ref()).collect();
assert!(labels.contains(&"old_users"));
assert!(labels.contains(&"new_users"));
let old_node = stmt
.nodes
.iter()
.find(|n| n.label.as_ref() == "old_users")
.expect("old_users node should exist");
let new_node = stmt
.nodes
.iter()
.find(|n| n.label.as_ref() == "new_users")
.expect("new_users node should exist");
assert_eq!(stmt.edges.len(), 1, "Should have exactly one edge");
let edge = &stmt.edges[0];
assert_eq!(
edge.edge_type,
EdgeType::DataFlow,
"Edge should be DataFlow"
);
assert_eq!(
edge.from.as_ref(),
old_node.id.as_ref(),
"Edge should be from old_users"
);
assert_eq!(
edge.to.as_ref(),
new_node.id.as_ref(),
"Edge should be to new_users"
);
assert_eq!(
edge.operation.as_ref().map(|o| o.as_ref()),
Some("RENAME"),
"Operation should be RENAME"
);
}
#[test]
fn test_alter_table_rename_with_schema() {
let sql = "ALTER TABLE analytics.legacy_orders RENAME TO analytics.orders_v2";
let result = run_analysis(sql, Dialect::Generic, None);
assert!(result.issues.iter().all(|i| i.severity != Severity::Error));
let stmt = StmtView::new(&result, 0);
let qualified_names: Vec<_> = stmt
.nodes
.iter()
.filter_map(|n| n.qualified_name.as_ref())
.map(|qn| qn.as_ref())
.collect();
assert!(qualified_names.contains(&"analytics.legacy_orders"));
assert!(qualified_names.contains(&"analytics.orders_v2"));
let old_node = stmt
.nodes
.iter()
.find(|n| {
n.qualified_name
.as_ref()
.map(|qn| qn.as_ref() == "analytics.legacy_orders")
.unwrap_or(false)
})
.expect("analytics.legacy_orders node should exist");
let new_node = stmt
.nodes
.iter()
.find(|n| {
n.qualified_name
.as_ref()
.map(|qn| qn.as_ref() == "analytics.orders_v2")
.unwrap_or(false)
})
.expect("analytics.orders_v2 node should exist");
assert_eq!(stmt.edges.len(), 1, "Should have exactly one edge");
let edge = &stmt.edges[0];
assert_eq!(
edge.edge_type,
EdgeType::DataFlow,
"Edge should be DataFlow"
);
assert_eq!(
edge.from.as_ref(),
old_node.id.as_ref(),
"Edge should be from legacy_orders"
);
assert_eq!(
edge.to.as_ref(),
new_node.id.as_ref(),
"Edge should be to orders_v2"
);
assert_eq!(
edge.operation.as_ref().map(|o| o.as_ref()),
Some("RENAME"),
"Operation should be RENAME"
);
}
#[test]
fn test_alter_table_rename_inherits_schema_when_unqualified() {
let sql = "ALTER TABLE analytics.legacy_orders RENAME TO orders_v2";
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
let old_node = stmt
.nodes
.iter()
.find(|n| {
n.qualified_name
.as_ref()
.map(|qn| qn.as_ref() == "analytics.legacy_orders")
.unwrap_or(false)
})
.expect("analytics.legacy_orders node should exist");
let new_node = stmt
.nodes
.iter()
.find(|n| n.label.as_ref() == "orders_v2")
.expect("orders_v2 node should exist");
assert_eq!(
new_node.qualified_name.as_deref(),
Some("analytics.orders_v2"),
"New node should inherit schema qualification"
);
assert_eq!(stmt.edges.len(), 1, "Should have exactly one edge");
let edge = &stmt.edges[0];
assert_eq!(
edge.from.as_ref(),
old_node.id.as_ref(),
"Edge should originate from old table"
);
assert_eq!(
edge.to.as_ref(),
new_node.id.as_ref(),
"Edge should point to renamed table"
);
assert_eq!(
edge.operation.as_ref().map(|o| o.as_ref()),
Some("RENAME"),
"Edge should be marked as RENAME operation"
);
}
#[test]
fn test_cross_statement_rename_then_select() {
let sql = r#"
ALTER TABLE old_users RENAME TO users;
SELECT id, name FROM users;
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
assert_eq!(result.statements.len(), 2);
let rename_stmt = StmtView::new(&result, 0);
assert!(rename_stmt
.nodes
.iter()
.any(|n| n.label.as_ref() == "old_users"));
assert!(rename_stmt
.nodes
.iter()
.any(|n| n.label.as_ref() == "users"));
let select_stmt = StmtView::new(&result, 1);
assert!(
select_stmt
.nodes
.iter()
.any(|n| n.label.as_ref() == "users"),
"SELECT should reference 'users' table"
);
}
#[test]
fn test_cross_statement_copy_then_select() {
let sql = r#"
COPY users FROM '/data/users.csv';
SELECT id, name FROM users WHERE active = true;
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
assert_eq!(result.statements.len(), 2);
let copy_stmt = StmtView::new(&result, 0);
assert!(
copy_stmt.nodes.iter().any(|n| n.label.as_ref() == "users"),
"COPY should reference 'users' table"
);
let select_stmt = StmtView::new(&result, 1);
assert!(
select_stmt
.nodes
.iter()
.any(|n| n.label.as_ref() == "users"),
"SELECT should reference 'users' table"
);
}
#[test]
fn test_cross_statement_ctas_then_select() {
let sql = r#"
CREATE TABLE active_users AS
SELECT id, name FROM users WHERE active = true;
SELECT * FROM active_users WHERE name LIKE 'A%';
"#;
let result = run_analysis(sql, Dialect::Generic, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
assert_eq!(result.statements.len(), 2);
let ctas_stmt = StmtView::new(&result, 0);
assert!(
ctas_stmt.nodes.iter().any(|n| n.label.as_ref() == "users"),
"CTAS should reference source 'users' table"
);
assert!(
ctas_stmt
.nodes
.iter()
.any(|n| n.label.as_ref() == "active_users"),
"CTAS should create 'active_users' table"
);
let select_stmt = StmtView::new(&result, 1);
assert!(
select_stmt
.nodes
.iter()
.any(|n| n.label.as_ref() == "active_users"),
"SELECT should reference 'active_users' table"
);
}
#[test]
fn test_copy_into_snowflake_with_transformation_query() {
let sql = r#"
COPY INTO target_table
FROM (SELECT $1, $2, CURRENT_TIMESTAMP() FROM @my_stage/data/)
FILE_FORMAT = (TYPE = CSV)
"#;
let result = run_analysis(sql, Dialect::Snowflake, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
assert!(
stmt.nodes
.iter()
.any(|n| n.label.as_ref() == "target_table"),
"Should have target_table node"
);
}
#[test]
fn test_copy_into_location_from_table() {
let sql = "COPY INTO @my_stage/export/ FROM analytics.orders";
let result = run_analysis(sql, Dialect::Snowflake, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
assert!(
stmt.nodes.iter().any(|n| n.label.as_ref() == "orders"),
"Should have orders table as source"
);
}
#[test]
fn test_copy_into_location_from_query() {
let sql = r#"
COPY INTO @my_stage/export/
FROM (
SELECT o.id, o.total, c.name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'completed'
)
"#;
let result = run_analysis(sql, Dialect::Snowflake, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
let labels: Vec<_> = stmt.nodes.iter().map(|n| n.label.to_uppercase()).collect();
assert!(
labels.iter().any(|l| l == "ORDERS"),
"Should have orders table, got: {:?}",
labels
);
assert!(
labels.iter().any(|l| l == "CUSTOMERS"),
"Should have customers table, got: {:?}",
labels
);
}
#[test]
fn test_unload_with_subquery_in_from() {
let sql = r#"
UNLOAD ('
SELECT sub.id, sub.name
FROM (
SELECT id, name FROM users WHERE active = true
) sub
')
TO 's3://bucket/users/'
"#;
let result = run_analysis(sql, Dialect::Redshift, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
let labels: Vec<_> = stmt.nodes.iter().map(|n| n.label.as_ref()).collect();
assert!(labels.contains(&"users"), "Should have users table");
}
#[test]
fn test_unload_with_scalar_subquery_expression() {
let sql = r#"
UNLOAD ('
SELECT u.id, u.name,
(SELECT COUNT(*) FROM orders o WHERE o.user_id = u.id) as order_count
FROM users u
WHERE u.active = true
')
TO 's3://bucket/users/'
"#;
let result = run_analysis(sql, Dialect::Redshift, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
let labels: Vec<_> = stmt.nodes.iter().map(|n| n.label.as_ref()).collect();
assert!(labels.contains(&"users"), "Should have users table");
let order_count_col = stmt
.nodes
.iter()
.find(|n| n.label.as_ref() == "order_count");
assert!(order_count_col.is_some(), "Should have order_count column");
assert!(
order_count_col
.unwrap()
.expression
.as_ref()
.map(|e| e.contains("orders"))
.unwrap_or(false),
"order_count expression should reference orders table"
);
}
#[test]
fn test_unload_with_cte() {
let sql = r#"
UNLOAD ('
WITH active_users AS (
SELECT id, name FROM users WHERE active = true
),
user_orders AS (
SELECT user_id, SUM(total) as total_spent
FROM orders
GROUP BY user_id
)
SELECT au.id, au.name, COALESCE(uo.total_spent, 0) as total_spent
FROM active_users au
LEFT JOIN user_orders uo ON au.id = uo.user_id
')
TO 's3://bucket/report/'
"#;
let result = run_analysis(sql, Dialect::Redshift, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
let labels: Vec<_> = stmt.nodes.iter().map(|n| n.label.as_ref()).collect();
assert!(labels.contains(&"users"), "Should have users table");
assert!(labels.contains(&"orders"), "Should have orders table");
}
#[test]
fn test_copy_postgres_with_column_list_and_options() {
let sql = "COPY users (id, name, email) FROM '/data/users.csv' WITH (FORMAT CSV, HEADER true)";
let result = run_analysis(sql, Dialect::Postgres, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
assert!(
stmt.nodes.iter().any(|n| n.label.as_ref() == "users"),
"Should have users table"
);
}
#[test]
fn test_copy_to_stdout() {
let sql = "COPY (SELECT id, name FROM users WHERE created_at > '2024-01-01') TO STDOUT";
let result = run_analysis(sql, Dialect::Postgres, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
assert!(
stmt.nodes.iter().any(|n| n.label.as_ref() == "users"),
"Should have users table as source"
);
}
#[test]
fn test_unload_empty_query_string() {
let sql = "UNLOAD ('') TO 's3://bucket/out'";
let result = run_analysis(sql, Dialect::Redshift, None);
assert!(!result.statements.is_empty());
}
#[test]
fn test_copy_from_multiple_files_pattern() {
let sql =
"COPY events FROM 's3://bucket/events/2024/*' IAM_ROLE 'arn:aws:iam::123:role/redshift'";
let result = run_analysis(sql, Dialect::Redshift, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
}
#[test]
fn test_alter_table_rename_preserves_case_sensitivity() {
let sql = r#"ALTER TABLE "MyTable" RENAME TO "my_table""#;
let result = run_analysis(sql, Dialect::Postgres, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
let qualified_names: Vec<_> = stmt
.nodes
.iter()
.filter_map(|n| n.qualified_name.as_ref())
.map(|qn| qn.as_ref())
.collect();
assert!(
qualified_names.contains(&"MyTable"),
"Should preserve case for MyTable, got: {:?}",
qualified_names
);
assert!(
qualified_names.contains(&"my_table"),
"Should have my_table, got: {:?}",
qualified_names
);
}
#[test]
fn test_alter_table_rename_with_full_catalog_path() {
let sql = "ALTER TABLE analytics_db.reporting.legacy_orders RENAME TO analytics_db.reporting.orders_v2";
let result = run_analysis(sql, Dialect::Snowflake, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
let tables: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(tables.len(), 2, "Should have source and target table nodes");
let qualified_names: Vec<_> = tables
.iter()
.filter_map(|t| t.qualified_name.as_ref())
.collect();
assert!(
qualified_names
.iter()
.any(|qn| qn.contains("LEGACY_ORDERS") || qn.contains("legacy_orders")),
"Should have source table, got: {:?}",
qualified_names
);
assert!(
qualified_names
.iter()
.any(|qn| qn.contains("ORDERS_V2") || qn.contains("orders_v2")),
"Should have target table, got: {:?}",
qualified_names
);
let rename_edges: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::DataFlow)
.filter(|e| e.operation.as_deref() == Some("RENAME"))
.collect();
assert_eq!(rename_edges.len(), 1, "Should have exactly one RENAME edge");
}
#[test]
fn test_alter_table_rename_inherits_catalog_when_partially_qualified() {
let sql = "ALTER TABLE production.sales.monthly_report RENAME TO quarterly_report";
let result = run_analysis(sql, Dialect::Snowflake, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
let tables: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.collect();
assert_eq!(tables.len(), 2, "Should have source and target table nodes");
let rename_edges: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::DataFlow)
.filter(|e| e.operation.as_deref() == Some("RENAME"))
.collect();
assert_eq!(rename_edges.len(), 1, "Should have exactly one RENAME edge");
}
#[test]
fn test_alter_table_rename_cross_schema() {
let sql = "ALTER TABLE staging.raw_data RENAME TO production.cleaned_data";
let result = run_analysis(sql, Dialect::Snowflake, None);
assert!(
result.issues.iter().all(|i| i.severity != Severity::Error),
"Should not produce errors: {:?}",
result.issues
);
let stmt = StmtView::new(&result, 0);
let qualified_names: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Table)
.filter_map(|n| n.qualified_name.as_ref())
.collect();
let has_staging = qualified_names
.iter()
.any(|qn| qn.to_uppercase().contains("STAGING") || qn.contains("staging"));
let has_production = qualified_names
.iter()
.any(|qn| qn.to_uppercase().contains("PRODUCTION") || qn.contains("production"));
assert!(
has_staging,
"Should reference staging schema, got: {:?}",
qualified_names
);
assert!(
has_production,
"Should reference production schema, got: {:?}",
qualified_names
);
let rename_edges: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::DataFlow)
.filter(|e| e.operation.as_deref() == Some("RENAME"))
.collect();
assert_eq!(rename_edges.len(), 1, "Should have exactly one RENAME edge");
}
#[test]
fn backward_inference_basic_select_star() {
let sql = r#"
WITH orders AS (
SELECT * FROM stg_orders
),
customer_orders AS (
SELECT
customer_id,
COUNT(order_id) AS order_count
FROM orders
GROUP BY customer_id
)
SELECT * FROM customer_orders
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let stg_orders_node = find_table_node(&stmt, "stg_orders");
assert!(
stg_orders_node.is_some(),
"stg_orders table node should exist"
);
let stg_orders_id = &stg_orders_node.unwrap().id;
let stg_orders_column_edges: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.from == *stg_orders_id && e.edge_type == EdgeType::Ownership)
.collect();
assert_eq!(
stg_orders_column_edges.len(),
2,
"stg_orders should have exactly 2 inferred column ownership edges (customer_id, order_id), found {}",
stg_orders_column_edges.len()
);
let stg_orders_column_labels: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Column
&& n.qualified_name
.as_ref()
.map(|qn| qn.starts_with("stg_orders."))
.unwrap_or(false)
})
.map(|n| n.label.to_string())
.collect();
assert!(
stg_orders_column_labels.contains(&"customer_id".to_string()),
"customer_id should be inferred on stg_orders"
);
assert!(
stg_orders_column_labels.contains(&"order_id".to_string()),
"order_id should be inferred on stg_orders"
);
let stg_orders_customer_id = stmt
.nodes
.iter()
.find(|n| {
n.node_type == NodeType::Column
&& n.label.as_ref() == "customer_id"
&& n.qualified_name
.as_ref()
.map(|qn| qn.starts_with("stg_orders."))
.unwrap_or(false)
})
.expect("stg_orders.customer_id should exist");
let data_flow_from_stg_orders_customer_id: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.from == stg_orders_customer_id.id && e.edge_type == EdgeType::DataFlow)
.collect();
assert!(
!data_flow_from_stg_orders_customer_id.is_empty(),
"Data flow edge should exist from stg_orders.customer_id to orders.customer_id"
);
}
#[test]
fn backward_inference_select_star_in_derived_table() {
let sql = r#"
SELECT
o.customer_id
FROM (
SELECT * FROM raw_orders
) AS o
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let raw_orders_customer_id = stmt
.nodes
.iter()
.find(|n| {
n.node_type == NodeType::Column
&& n.qualified_name.as_deref() == Some("raw_orders.customer_id")
})
.expect("raw_orders.customer_id should be inferred through derived table");
let has_data_flow = stmt
.edges
.iter()
.any(|edge| edge.edge_type == EdgeType::DataFlow && edge.from == raw_orders_customer_id.id);
assert!(
has_data_flow,
"raw_orders.customer_id should participate in data flow after inference"
);
}
#[test]
fn backward_inference_transitive_chain() {
let sql = r#"
WITH step1 AS (
SELECT * FROM raw_events
),
step2 AS (
SELECT * FROM step1
),
step3 AS (
SELECT
event_id,
event_type,
user_id
FROM step2
)
SELECT * FROM step3
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let raw_events_node = find_table_node(&stmt, "raw_events");
assert!(
raw_events_node.is_some(),
"raw_events table node should exist"
);
let raw_events_columns: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Column
&& n.qualified_name
.as_ref()
.map(|qn| qn.starts_with("raw_events."))
.unwrap_or(false)
})
.map(|n| n.label.to_string())
.collect();
assert_eq!(
raw_events_columns.len(),
3,
"raw_events should have exactly 3 inferred columns (event_id, event_type, user_id), found: {:?}",
raw_events_columns
);
assert!(
raw_events_columns.contains(&"event_id".to_string()),
"event_id should be inferred on raw_events"
);
assert!(
raw_events_columns.contains(&"event_type".to_string()),
"event_type should be inferred on raw_events"
);
assert!(
raw_events_columns.contains(&"user_id".to_string()),
"user_id should be inferred on raw_events"
);
let raw_events_event_id = stmt
.nodes
.iter()
.find(|n| {
n.node_type == NodeType::Column
&& n.qualified_name.as_deref() == Some("raw_events.event_id")
})
.expect("raw_events.event_id should exist");
let data_flow_edges_from_event_id: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.from == raw_events_event_id.id && e.edge_type == EdgeType::DataFlow)
.collect();
assert!(
!data_flow_edges_from_event_id.is_empty(),
"Data flow edges should exist from raw_events.event_id through the CTE chain"
);
}
#[test]
fn backward_inference_multiple_sources() {
let sql = r#"
WITH orders AS (
SELECT * FROM stg_orders
),
customers AS (
SELECT * FROM stg_customers
),
combined AS (
SELECT
orders.order_id,
orders.order_date,
customers.customer_name,
customers.email
FROM orders
JOIN customers ON orders.customer_id = customers.customer_id
)
SELECT * FROM combined
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let stg_orders_columns: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Column
&& n.qualified_name
.as_ref()
.map(|qn| qn.starts_with("stg_orders."))
.unwrap_or(false)
})
.map(|n| n.label.to_string())
.collect();
assert_eq!(
stg_orders_columns.len(),
2,
"stg_orders should have exactly 2 inferred columns from SELECT, found: {:?}",
stg_orders_columns
);
assert!(
stg_orders_columns.contains(&"order_id".to_string()),
"order_id should be inferred on stg_orders"
);
assert!(
stg_orders_columns.contains(&"order_date".to_string()),
"order_date should be inferred on stg_orders"
);
let stg_customers_columns: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Column
&& n.qualified_name
.as_ref()
.map(|qn| qn.starts_with("stg_customers."))
.unwrap_or(false)
})
.map(|n| n.label.to_string())
.collect();
assert_eq!(
stg_customers_columns.len(),
2,
"stg_customers should have exactly 2 inferred columns from SELECT, found: {:?}",
stg_customers_columns
);
assert!(
stg_customers_columns.contains(&"customer_name".to_string()),
"customer_name should be inferred on stg_customers"
);
assert!(
stg_customers_columns.contains(&"email".to_string()),
"email should be inferred on stg_customers"
);
let stg_orders_order_id = stmt
.nodes
.iter()
.find(|n| {
n.node_type == NodeType::Column
&& n.qualified_name.as_deref() == Some("stg_orders.order_id")
})
.expect("stg_orders.order_id should exist");
let stg_customers_customer_name = stmt
.nodes
.iter()
.find(|n| {
n.node_type == NodeType::Column
&& n.qualified_name.as_deref() == Some("stg_customers.customer_name")
})
.expect("stg_customers.customer_name should exist");
let has_orders_data_flow = stmt
.edges
.iter()
.any(|e| e.from == stg_orders_order_id.id && e.edge_type == EdgeType::DataFlow);
let has_customers_data_flow = stmt
.edges
.iter()
.any(|e| e.from == stg_customers_customer_name.id && e.edge_type == EdgeType::DataFlow);
assert!(
has_orders_data_flow,
"Data flow edge should exist from stg_orders.order_id to orders CTE"
);
assert!(
has_customers_data_flow,
"Data flow edge should exist from stg_customers.customer_name to customers CTE"
);
}
#[test]
fn backward_inference_with_schema_no_inference_needed() {
let sql = r#"
WITH orders AS (
SELECT * FROM stg_orders
)
SELECT customer_id, order_id FROM orders
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![schema_table(
None,
None,
"stg_orders",
&["customer_id", "order_id", "amount", "order_date"],
)],
};
let result = run_analysis(sql, Dialect::Generic, Some(schema));
let stmt = first_statement(&result);
let stg_orders_columns: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Column
&& n.qualified_name
.as_ref()
.map(|qn| qn.starts_with("stg_orders."))
.unwrap_or(false)
})
.collect();
assert!(
!stg_orders_columns.is_empty(),
"stg_orders should have columns from schema"
);
let approximate_issues: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE && i.message.contains("stg_orders"))
.collect();
assert!(
approximate_issues.is_empty(),
"Should not have approximate lineage issues when schema is provided"
);
}
#[test]
fn backward_inference_mixed_schema_and_no_schema() {
let sql = r#"
WITH orders AS (
SELECT * FROM stg_orders
),
customers AS (
SELECT * FROM stg_customers
),
combined AS (
SELECT
o.order_id,
c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
)
SELECT * FROM combined
"#;
let schema = SchemaMetadata {
allow_implied: true,
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
tables: vec![schema_table(
None,
None,
"stg_orders",
&["customer_id", "order_id", "amount"],
)],
};
let result = run_analysis(sql, Dialect::Generic, Some(schema));
let stmt = first_statement(&result);
let stg_orders_order_id = stmt.nodes.iter().find(|n| {
n.node_type == NodeType::Column
&& n.qualified_name.as_deref() == Some("stg_orders.order_id")
});
assert!(
stg_orders_order_id.is_some(),
"stg_orders.order_id should exist from schema"
);
let stg_customers_columns: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Column
&& n.qualified_name
.as_ref()
.map(|qn| qn.starts_with("stg_customers."))
.unwrap_or(false)
})
.map(|n| n.label.to_string())
.collect();
assert!(
stg_customers_columns.contains(&"customer_name".to_string()),
"customer_name should be inferred on stg_customers"
);
let approximate_issues: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approximate_issues
.iter()
.any(|i| i.message.contains("stg_customers")),
"Should have approximate lineage issue for stg_customers"
);
}
#[test]
fn backward_inference_aggregation_preserves_column_names() {
let sql = r#"
WITH orders AS (
SELECT * FROM raw_orders
),
summary AS (
SELECT
customer_id,
COUNT(order_id) AS order_count,
SUM(amount) AS total_amount,
MIN(order_date) AS first_order
FROM orders
GROUP BY customer_id
)
SELECT * FROM summary
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let raw_orders_columns: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Column
&& n.qualified_name
.as_ref()
.map(|qn| qn.starts_with("raw_orders."))
.unwrap_or(false)
})
.map(|n| n.label.to_string())
.collect();
assert!(
raw_orders_columns.contains(&"customer_id".to_string()),
"customer_id should be inferred on raw_orders"
);
assert!(
raw_orders_columns.contains(&"order_id".to_string()),
"order_id should be inferred on raw_orders"
);
assert!(
raw_orders_columns.contains(&"amount".to_string()),
"amount should be inferred on raw_orders"
);
assert!(
raw_orders_columns.contains(&"order_date".to_string()),
"order_date should be inferred on raw_orders"
);
}
#[test]
fn backward_inference_deep_nesting() {
let sql = r#"
WITH step1 AS (SELECT * FROM source_table),
step2 AS (SELECT * FROM step1),
step3 AS (SELECT * FROM step2),
step4 AS (SELECT * FROM step3),
step5 AS (SELECT * FROM step4),
step6 AS (SELECT * FROM step5),
step7 AS (SELECT * FROM step6),
step8 AS (SELECT * FROM step7),
step9 AS (SELECT * FROM step8),
step10 AS (SELECT id, name, value FROM step9)
SELECT * FROM step10
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let source_columns: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Column
&& n.qualified_name
.as_ref()
.map(|qn| qn.starts_with("source_table."))
.unwrap_or(false)
})
.map(|n| n.label.to_string())
.collect();
assert_eq!(
source_columns.len(),
3,
"source_table should have exactly 3 inferred columns through 10-level chain, found: {:?}",
source_columns
);
assert!(
source_columns.contains(&"id".to_string()),
"id should be inferred on source_table"
);
assert!(
source_columns.contains(&"name".to_string()),
"name should be inferred on source_table"
);
assert!(
source_columns.contains(&"value".to_string()),
"value should be inferred on source_table"
);
let source_table_id = stmt
.nodes
.iter()
.find(|n| {
n.node_type == NodeType::Column
&& n.qualified_name.as_deref() == Some("source_table.id")
})
.expect("source_table.id should exist");
let data_flow_from_id: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.from == source_table_id.id && e.edge_type == EdgeType::DataFlow)
.collect();
assert!(
!data_flow_from_id.is_empty(),
"Data flow edges should exist from source_table.id through the deep CTE chain"
);
}
#[test]
fn backward_inference_cycle_detection() {
let sql = r#"
WITH base AS (
SELECT * FROM shared_source
),
branch_a AS (
SELECT * FROM base
),
branch_b AS (
SELECT * FROM base
),
combined AS (
SELECT
branch_a.id AS a_id,
branch_b.id AS b_id
FROM branch_a, branch_b
)
SELECT * FROM combined
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let source_columns: Vec<_> = stmt
.nodes
.iter()
.filter(|n| {
n.node_type == NodeType::Column
&& n.qualified_name
.as_ref()
.map(|qn| qn.starts_with("shared_source."))
.unwrap_or(false)
})
.map(|n| n.label.to_string())
.collect();
assert!(
source_columns.contains(&"id".to_string()),
"id should be inferred on shared_source, found: {:?}",
source_columns
);
let shared_source_id = stmt
.nodes
.iter()
.find(|n| {
n.node_type == NodeType::Column
&& n.qualified_name.as_deref() == Some("shared_source.id")
})
.expect("shared_source.id should exist");
let data_flow_from_shared_source: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.from == shared_source_id.id && e.edge_type == EdgeType::DataFlow)
.collect();
assert!(
!data_flow_from_shared_source.is_empty(),
"Data flow edges should exist from shared_source.id (referenced through both branches)"
);
}
#[test]
fn nested_joins_track_all_tables() {
let sql = r#"
SELECT
o.order_id,
c.email,
p.product_name,
s.supplier_name
FROM
(
(
(
orders o
JOIN customers c ON c.customer_id = o.customer_id
)
JOIN products p ON p.product_id = o.product_id
)
JOIN suppliers s ON s.supplier_id = p.supplier_id
)
WHERE c.email = 'sample@example.com'
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let tables = collect_table_names(&result);
for expected in ["orders", "customers", "products", "suppliers"] {
assert!(
tables.contains(expected),
"nested join should track table {expected}; saw {tables:?}"
);
}
let stmt = first_statement(&result);
let columns = column_labels(&stmt);
for expected in ["order_id", "email", "product_name", "supplier_name"] {
assert!(
columns.contains(&expected.to_string()),
"expected column {expected} in output; saw {columns:?}"
);
}
let data_flow_edges = edges_by_type(&stmt, EdgeType::DataFlow);
assert!(
!data_flow_edges.is_empty(),
"expected data flow edges for 4-way join"
);
}
#[test]
fn postgres_array_slicing_tracks_source_table() {
let sql = r#"
SELECT a[:], b[:1], c[2:], d[2:3]
FROM array_data
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("array_data"),
"array slicing query should track source table; saw {tables:?}"
);
let stmt = first_statement(&result);
assert!(
!result.summary.has_errors,
"array slicing query should parse without errors: {:?}",
result.issues
);
let table_node = find_table_node(&stmt, "array_data");
assert!(
table_node.is_some(),
"array_data table node should exist in lineage"
);
}
#[test]
fn lateral_join_tracks_outer_and_subquery_tables() {
let sql = r#"
SELECT
d.department_id,
d.name AS department_name,
emp.employee_name,
emp.salary
FROM departments d
JOIN LATERAL (
SELECT e.name AS employee_name, e.salary
FROM employees e
WHERE e.department_id = d.department_id
ORDER BY e.salary DESC
LIMIT 3
) emp ON true
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("departments"),
"LATERAL join should track outer table; saw {tables:?}"
);
assert!(
tables.contains("employees"),
"LATERAL join should track table inside LATERAL subquery; saw {tables:?}"
);
let stmt = first_statement(&result);
let columns = column_labels(&stmt);
for expected in [
"department_id",
"department_name",
"employee_name",
"salary",
] {
assert!(
columns.contains(&expected.to_string()),
"expected column {expected} in LATERAL join output; saw {columns:?}"
);
}
assert!(
!result.summary.has_errors,
"LATERAL join should parse without errors: {:?}",
result.issues
);
}
#[test]
fn filter_clause_tracks_aggregation_sources() {
let sql = r#"
SELECT
department_id,
SUM(salary) AS total_salary,
SUM(salary) FILTER (WHERE years_employed > 5) AS senior_salary,
AVG(salary) FILTER (WHERE performance_rating >= 4) AS high_performer_avg
FROM employees
GROUP BY department_id
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("employees"),
"FILTER clause query should track source table; saw {tables:?}"
);
let stmt = first_statement(&result);
let columns = column_labels(&stmt);
for expected in [
"department_id",
"total_salary",
"senior_salary",
"high_performer_avg",
] {
assert!(
columns.contains(&expected.to_string()),
"expected column {expected} in FILTER clause output; saw {columns:?}"
);
}
assert!(
!result.summary.has_errors,
"FILTER clause query should parse without errors: {:?}",
result.issues
);
}
#[test]
fn group_by_cube_rollup_tracks_source_table() {
let sql = r#"
SELECT
region,
city,
GROUPING(region, city) AS grp_idx,
COUNT(DISTINCT id) AS num_total
FROM locations
GROUP BY GROUPING SETS ((region), (city), (region, city), ())
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("locations"),
"GROUPING SETS query should track source table; saw {tables:?}"
);
let stmt = first_statement(&result);
let columns = column_labels(&stmt);
for expected in ["region", "city", "grp_idx", "num_total"] {
assert!(
columns.contains(&expected.to_string()),
"expected column {expected} in GROUPING SETS output; saw {columns:?}"
);
}
assert!(
!result.summary.has_errors,
"GROUPING SETS query should parse without errors: {:?}",
result.issues
);
}
#[test]
fn rollup_tracks_hierarchical_columns() {
let sql = r#"
SELECT
year,
quarter,
month,
SUM(revenue) AS total_revenue
FROM sales
GROUP BY ROLLUP (year, quarter, month)
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("sales"),
"ROLLUP query should track source table; saw {tables:?}"
);
let stmt = first_statement(&result);
let columns = column_labels(&stmt);
for expected in ["year", "quarter", "month", "total_revenue"] {
assert!(
columns.contains(&expected.to_string()),
"expected column {expected} in ROLLUP output; saw {columns:?}"
);
}
assert!(
!result.summary.has_errors,
"ROLLUP query should parse without errors: {:?}",
result.issues
);
}
#[test]
fn cube_tracks_all_dimensions() {
let sql = r#"
SELECT
product_category,
sales_region,
SUM(quantity) AS total_quantity,
SUM(amount) AS total_amount
FROM transactions
GROUP BY CUBE (product_category, sales_region)
"#;
let result = run_analysis(sql, Dialect::Postgres, None);
let tables = collect_table_names(&result);
assert!(
tables.contains("transactions"),
"CUBE query should track source table; saw {tables:?}"
);
let stmt = first_statement(&result);
let columns = column_labels(&stmt);
for expected in [
"product_category",
"sales_region",
"total_quantity",
"total_amount",
] {
assert!(
columns.contains(&expected.to_string()),
"expected column {expected} in CUBE output; saw {columns:?}"
);
}
assert!(
!result.summary.has_errors,
"CUBE query should parse without errors: {:?}",
result.issues
);
}
#[test]
fn snowflake_lateral_flatten_tracks_source_table() {
let sql = r#"
SELECT
value AS p_id,
name
FROM a
INNER JOIN b ON b.c_id = a.c_id,
LATERAL FLATTEN(input => b.cool_ids)
"#;
let result = run_analysis(sql, Dialect::Snowflake, None);
let tables = collect_table_names(&result);
let has_table_a = tables.iter().any(|t| t.eq_ignore_ascii_case("a"));
let has_table_b = tables.iter().any(|t| t.eq_ignore_ascii_case("b"));
assert!(
has_table_a,
"LATERAL FLATTEN query should track table a; saw {tables:?}"
);
assert!(
has_table_b,
"LATERAL FLATTEN query should track table b; saw {tables:?}"
);
assert!(
result.summary.statement_count >= 1,
"LATERAL FLATTEN query should parse at least one statement"
);
let stmt = first_statement(&result);
let columns = column_labels(&stmt);
assert!(
columns.iter().any(|c| c.eq_ignore_ascii_case("p_id")),
"LATERAL FLATTEN pseudocolumns should still produce visible output columns"
);
assert!(
columns.iter().any(|c| c.eq_ignore_ascii_case("name")),
"best-effort unresolved projections should remain visible alongside FLATTEN outputs"
);
}
#[test]
fn snowflake_higher_order_functions_track_source() {
let sql = r#"
SELECT
FILTER(ident, i -> i:value > 0) AS sample_filter,
TRANSFORM(ident, j -> j:value) AS sample_transform
FROM ref
"#;
let result = run_analysis(sql, Dialect::Snowflake, None);
let tables = collect_table_names(&result);
let has_ref = tables.iter().any(|t| t.eq_ignore_ascii_case("ref"));
assert!(
has_ref,
"Higher-order function query should track source table; saw {tables:?}"
);
assert!(
result.summary.statement_count >= 1,
"Higher-order function query should parse at least one statement"
);
}
#[test]
fn snowflake_reduce_keeps_output_column_when_lineage_resolution_is_partial() {
let sql = r#"
SELECT REDUCE([1, 2, 3], 0, (acc, val) -> acc + val) AS sum_result
"#;
let result = run_analysis(sql, Dialect::Snowflake, None);
let stmt = first_statement(&result);
let columns = column_labels(&stmt);
assert!(
columns.iter().any(|c| c.eq_ignore_ascii_case("sum_result")),
"partially-resolved higher-order functions should still keep their output column"
);
}
#[test]
fn snowflake_group_by_cube_tracks_source() {
let sql = r#"
SELECT
name,
age,
COUNT(*) AS record_count
FROM people
GROUP BY CUBE (name, age)
"#;
let result = run_analysis(sql, Dialect::Snowflake, None);
let tables = collect_table_names(&result);
let has_people = tables.iter().any(|t| t.eq_ignore_ascii_case("people"));
assert!(
has_people,
"Snowflake CUBE query should track source table; saw {tables:?}"
);
let stmt = first_statement(&result);
let columns = column_labels(&stmt);
for expected in ["NAME", "AGE", "RECORD_COUNT"] {
assert!(
columns.iter().any(|c| c.eq_ignore_ascii_case(expected)),
"expected column {expected} in Snowflake CUBE output; saw {columns:?}"
);
}
assert!(
!result.summary.has_errors,
"Snowflake CUBE query should parse without errors: {:?}",
result.issues
);
}
#[test]
fn snowflake_grouping_sets_tracks_source() {
let sql = r#"
SELECT
foo,
bar,
COUNT(*) AS cnt
FROM baz
GROUP BY GROUPING SETS ((foo), (bar))
"#;
let result = run_analysis(sql, Dialect::Snowflake, None);
let tables = collect_table_names(&result);
let has_baz = tables.iter().any(|t| t.eq_ignore_ascii_case("baz"));
assert!(
has_baz,
"Snowflake GROUPING SETS query should track source table; saw {tables:?}"
);
let stmt = first_statement(&result);
let columns = column_labels(&stmt);
for expected in ["FOO", "BAR", "CNT"] {
assert!(
columns.iter().any(|c| c.eq_ignore_ascii_case(expected)),
"expected column {expected} in Snowflake GROUPING SETS output; saw {columns:?}"
);
}
assert!(
!result.summary.has_errors,
"Snowflake GROUPING SETS query should parse without errors: {:?}",
result.issues
);
}
fn get_column_data_type(node: &Node) -> Option<String> {
node.metadata
.as_ref()
.and_then(|m| m.get("data_type"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
#[test]
fn type_inference_select_literals_have_correct_types() {
let sql = r#"
SELECT
1 AS int_val,
'text' AS text_val,
true AS bool_val
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let int_col = find_column_node(&stmt, "int_val").expect("int_val column should exist");
let text_col = find_column_node(&stmt, "text_val").expect("text_val column should exist");
let bool_col = find_column_node(&stmt, "bool_val").expect("bool_val column should exist");
assert_eq!(
get_column_data_type(int_col),
Some("FLOAT".to_string()),
"Integer literal should infer as FLOAT"
);
assert_eq!(
get_column_data_type(text_col),
Some("TEXT".to_string()),
"String literal should infer as TEXT"
);
assert_eq!(
get_column_data_type(bool_col),
Some("BOOLEAN".to_string()),
"Boolean literal should infer as BOOLEAN"
);
}
#[test]
fn type_inference_select_functions_have_correct_types() {
let sql = r#"
SELECT
COUNT(*) AS count_val,
SUM(amount) AS sum_val,
CONCAT(first_name, last_name) AS concat_val,
NOW() AS timestamp_val
FROM users
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let count_col = find_column_node(&stmt, "count_val").expect("count_val column should exist");
let sum_col = find_column_node(&stmt, "sum_val").expect("sum_val column should exist");
let concat_col = find_column_node(&stmt, "concat_val").expect("concat_val column should exist");
let timestamp_col =
find_column_node(&stmt, "timestamp_val").expect("timestamp_val column should exist");
assert_eq!(
get_column_data_type(count_col),
Some("INTEGER".to_string()),
"COUNT(*) should infer as INTEGER"
);
assert_eq!(
get_column_data_type(sum_col),
Some("FLOAT".to_string()),
"SUM() should infer as FLOAT"
);
assert_eq!(
get_column_data_type(concat_col),
Some("TEXT".to_string()),
"CONCAT() should infer as TEXT"
);
assert_eq!(
get_column_data_type(timestamp_col),
Some("TIMESTAMP".to_string()),
"NOW() should infer as TIMESTAMP"
);
}
#[test]
fn type_inference_cte_types_propagate_to_outer_query() {
let sql = r#"
WITH metrics AS (
SELECT
COUNT(*) AS row_count,
SUM(amount) AS total_amount,
CONCAT(name, '_suffix') AS name_with_suffix
FROM orders
)
SELECT
row_count,
total_amount,
name_with_suffix
FROM metrics
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let cte_node = find_cte_node(&stmt, "metrics").expect("metrics CTE should exist");
let cte_columns: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::Ownership && e.from == cte_node.id)
.filter_map(|e| stmt.nodes.iter().find(|n| n.id == e.to))
.collect();
let cte_row_count = cte_columns
.iter()
.find(|n| &*n.label == "row_count")
.expect("row_count should be in CTE");
let cte_total_amount = cte_columns
.iter()
.find(|n| &*n.label == "total_amount")
.expect("total_amount should be in CTE");
let cte_name_with_suffix = cte_columns
.iter()
.find(|n| &*n.label == "name_with_suffix")
.expect("name_with_suffix should be in CTE");
assert_eq!(
get_column_data_type(cte_row_count),
Some("INTEGER".to_string()),
"CTE row_count should be INTEGER"
);
assert_eq!(
get_column_data_type(cte_total_amount),
Some("FLOAT".to_string()),
"CTE total_amount should be FLOAT"
);
assert_eq!(
get_column_data_type(cte_name_with_suffix),
Some("TEXT".to_string()),
"CTE name_with_suffix should be TEXT"
);
let output_node = stmt
.nodes
.iter()
.find(|n| n.node_type == NodeType::Output)
.expect("Output node should exist");
let output_columns: Vec<_> = stmt
.edges
.iter()
.filter(|e| e.edge_type == EdgeType::Ownership && e.from == output_node.id)
.filter_map(|e| stmt.nodes.iter().find(|n| n.id == e.to))
.collect();
let out_row_count = output_columns
.iter()
.find(|n| &*n.label == "row_count")
.expect("row_count should be in output");
let out_total_amount = output_columns
.iter()
.find(|n| &*n.label == "total_amount")
.expect("total_amount should be in output");
let out_name_with_suffix = output_columns
.iter()
.find(|n| &*n.label == "name_with_suffix")
.expect("name_with_suffix should be in output");
assert_eq!(
get_column_data_type(out_row_count),
Some("INTEGER".to_string()),
"Output row_count should propagate INTEGER from CTE"
);
assert_eq!(
get_column_data_type(out_total_amount),
Some("FLOAT".to_string()),
"Output total_amount should propagate FLOAT from CTE"
);
assert_eq!(
get_column_data_type(out_name_with_suffix),
Some("TEXT".to_string()),
"Output name_with_suffix should propagate TEXT from CTE"
);
}
fn schema_table_typed(name: &str, columns: Vec<ColumnSchema>) -> SchemaTable {
SchemaTable {
catalog: None,
schema: None,
name: name.to_string(),
columns,
}
}
#[test]
fn test_column_reference_with_schema_returns_correct_type() {
let schema = SchemaMetadata {
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
tables: vec![schema_table_typed(
"users",
vec![
column_typed("id", "integer"),
column_typed("email", "varchar"),
column_typed("created_at", "timestamp"),
column_typed("is_active", "boolean"),
],
)],
};
let sql = r#"
SELECT id, email, created_at, is_active
FROM users
"#;
let result = run_analysis(sql, Dialect::Generic, Some(schema));
assert!(result.issues.is_empty(), "Should have no issues");
let stmt = StmtView::new(&result, 0);
let id_col = find_column_node(&stmt, "id").expect("id column should exist");
let email_col = find_column_node(&stmt, "email").expect("email column should exist");
let created_at_col =
find_column_node(&stmt, "created_at").expect("created_at column should exist");
let is_active_col =
find_column_node(&stmt, "is_active").expect("is_active column should exist");
assert_eq!(
get_column_data_type(id_col),
Some("INTEGER".to_string()),
"id should be INTEGER from schema"
);
assert_eq!(
get_column_data_type(email_col),
Some("TEXT".to_string()),
"email (varchar) should normalize to TEXT from schema"
);
assert_eq!(
get_column_data_type(created_at_col),
Some("TIMESTAMP".to_string()),
"created_at should be TIMESTAMP from schema"
);
assert_eq!(
get_column_data_type(is_active_col),
Some("BOOLEAN".to_string()),
"is_active should be BOOLEAN from schema"
);
}
#[test]
fn test_column_reference_without_schema_returns_none() {
let sql = r#"
SELECT id, email
FROM users
"#;
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = StmtView::new(&result, 0);
let id_col = find_column_node(&stmt, "id").expect("id column should exist");
let email_col = find_column_node(&stmt, "email").expect("email column should exist");
assert_eq!(
get_column_data_type(id_col),
None,
"id should have no type without schema"
);
assert_eq!(
get_column_data_type(email_col),
None,
"email should have no type without schema"
);
}
#[test]
fn test_qualified_column_reference_with_schema() {
let schema = SchemaMetadata {
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
tables: vec![
schema_table_typed(
"users",
vec![column_typed("id", "integer"), column_typed("name", "text")],
),
schema_table_typed(
"orders",
vec![
column_typed("id", "integer"),
column_typed("user_id", "integer"),
column_typed("total", "numeric"),
],
),
],
};
let sql = r#"
SELECT users.id, users.name, orders.total
FROM users
JOIN orders ON users.id = orders.user_id
"#;
let result = run_analysis(sql, Dialect::Generic, Some(schema));
assert!(result.issues.is_empty(), "Should have no issues");
let stmt = StmtView::new(&result, 0);
let id_col = find_column_node(&stmt, "id").expect("id column should exist");
let name_col = find_column_node(&stmt, "name").expect("name column should exist");
let total_col = find_column_node(&stmt, "total").expect("total column should exist");
assert_eq!(
get_column_data_type(id_col),
Some("INTEGER".to_string()),
"users.id should be INTEGER from schema"
);
assert_eq!(
get_column_data_type(name_col),
Some("TEXT".to_string()),
"users.name should be TEXT from schema"
);
assert_eq!(
get_column_data_type(total_col),
Some("FLOAT".to_string()),
"orders.total (numeric) should normalize to FLOAT from schema"
);
}
#[test]
fn test_schema_type_normalization() {
let schema = SchemaMetadata {
default_catalog: None,
default_schema: None,
search_path: None,
case_sensitivity: None,
allow_implied: true,
tables: vec![schema_table_typed(
"test_types",
vec![
column_typed("int64_col", "int64"), column_typed("varchar_col", "varchar"), column_typed("float8_col", "float8"), column_typed("datetime_col", "datetime"), column_typed("bool_col", "bool"), ],
)],
};
let sql = r#"
SELECT int64_col, varchar_col, float8_col, datetime_col, bool_col
FROM test_types
"#;
let result = run_analysis(sql, Dialect::Generic, Some(schema));
assert!(result.issues.is_empty(), "Should have no issues");
let stmt = StmtView::new(&result, 0);
let int64_col = find_column_node(&stmt, "int64_col").expect("int64_col should exist");
let varchar_col = find_column_node(&stmt, "varchar_col").expect("varchar_col should exist");
let float8_col = find_column_node(&stmt, "float8_col").expect("float8_col should exist");
let datetime_col = find_column_node(&stmt, "datetime_col").expect("datetime_col should exist");
let bool_col = find_column_node(&stmt, "bool_col").expect("bool_col should exist");
assert_eq!(
get_column_data_type(int64_col),
Some("INTEGER".to_string()),
"int64 should normalize to INTEGER"
);
assert_eq!(
get_column_data_type(varchar_col),
Some("TEXT".to_string()),
"varchar should normalize to TEXT"
);
assert_eq!(
get_column_data_type(float8_col),
Some("FLOAT".to_string()),
"float8 should normalize to FLOAT"
);
assert_eq!(
get_column_data_type(datetime_col),
Some("TIMESTAMP".to_string()),
"datetime should normalize to TIMESTAMP"
);
assert_eq!(
get_column_data_type(bool_col),
Some("BOOLEAN".to_string()),
"bool should normalize to BOOLEAN"
);
}
#[test]
fn name_spans_single_table_reference() {
let sql = "SELECT * FROM users WHERE id = 1";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let users = find_table_node(&stmt, "users").expect("users table node");
assert_eq!(users.name_spans.len(), 1);
let span = users.name_spans[0];
assert_eq!(&sql[span.start..span.end], "users");
assert!(users.body_span.is_none());
}
#[test]
fn name_spans_multiple_table_references() {
let sql = "SELECT u.id FROM users u WHERE u.id IN (SELECT id FROM users)";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let users = find_table_node(&stmt, "users").expect("users table node");
assert_eq!(
users.name_spans.len(),
2,
"expected both `users` references"
);
for span in &users.name_spans {
assert_eq!(&sql[span.start..span.end], "users");
}
}
#[test]
fn name_spans_cte_with_body_and_references() {
let sql = "WITH active AS (SELECT id FROM users WHERE active) \
SELECT a.id FROM active a JOIN active b ON a.id = b.id";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let active_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|node| {
node.node_type == NodeType::Cte && node.qualified_name.as_deref() == Some("active")
})
.collect();
assert!(
!active_nodes.is_empty(),
"expected at least one CTE node for `active`"
);
let total_spans: Vec<_> = active_nodes
.iter()
.flat_map(|node| node.name_spans.iter().copied())
.collect();
assert_eq!(
total_spans.len(),
3,
"expected declaration plus two relation occurrences across CTE instances"
);
for span in &total_spans {
assert_eq!(&sql[span.start..span.end], "active");
}
let cte_definition = active_nodes
.iter()
.find(|node| node.body_span.is_some())
.expect("cte definition node should retain body span");
let body = cte_definition
.body_span
.expect("cte body span should be populated");
let body_text = &sql[body.start..body.end];
assert!(body_text.starts_with('(') && body_text.ends_with(')'));
assert!(body_text.contains("SELECT id FROM users WHERE active"));
}
#[test]
fn name_spans_ignores_matches_in_strings_and_comments() {
let sql = "-- users\nSELECT * FROM users WHERE note = 'see users'";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let users = find_table_node(&stmt, "users").expect("users table node");
assert_eq!(
users.name_spans.len(),
1,
"matches inside comments/strings must not count"
);
let span = users.name_spans[0];
assert_eq!(&sql[span.start..span.end], "users");
}
#[test]
fn name_spans_skip_string_literals_before_relation_occurrence() {
let sql = "SELECT 'users' AS label FROM users";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let users = find_table_node(&stmt, "users").expect("users table node");
assert_eq!(users.name_spans.len(), 1);
let span = users.name_spans[0];
assert_eq!(&sql[span.start..span.end], "users");
assert_eq!(
span.start, 29,
"should bind to FROM users, not the string literal"
);
}
#[test]
fn name_spans_skip_hash_comments_in_mysql() {
let sql = "SELECT 1 # users\nFROM users";
let result = run_analysis(sql, Dialect::Mysql, None);
let stmt = StmtView::new(&result, 0);
let users = find_table_node(&stmt, "users").expect("users table node");
assert_eq!(users.name_spans.len(), 1);
let span = users.name_spans[0];
assert_eq!(&sql[span.start..span.end], "users");
assert_eq!(
span.start, 22,
"should bind to FROM users, not the hash comment"
);
}
#[test]
fn name_spans_skip_dollar_quoted_string_literals() {
let sql = "SELECT $$users$$ AS x FROM users";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let users = find_table_node(&stmt, "users").expect("users table node");
assert_eq!(users.name_spans.len(), 1);
let span = users.name_spans[0];
assert_eq!(&sql[span.start..span.end], "users");
assert_eq!(
span.start, 27,
"should bind to FROM users, not the dollar-quoted literal"
);
}
#[test]
fn name_spans_empty_on_column_nodes() {
let sql = "SELECT id FROM users";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let id_col = find_column_node(&stmt, "id").expect("id column node");
assert!(
id_col.name_spans.is_empty(),
"column nodes should not populate name_spans in this release"
);
}
#[test]
fn name_spans_recursive_cte_include_recursive_reference() {
let sql = concat!(
"WITH RECURSIVE org AS (",
"SELECT id FROM employees ",
"UNION ALL ",
"SELECT e.id FROM employees e JOIN org ON e.manager_id = org.id",
") SELECT * FROM org"
);
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let org = find_cte_node(&stmt, "org").expect("org cte node");
assert_eq!(
org.name_spans.len(),
3,
"definition, recursive self-reference, and outer reference should all be tracked"
);
for span in &org.name_spans {
assert_eq!(&sql[span.start..span.end], "org");
}
}
#[test]
fn name_spans_self_join_are_instance_specific() {
let sql =
"SELECT e1.name, e2.name FROM employees e1 JOIN employees e2 ON e1.manager_id = e2.id";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let employee_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|node| {
node.node_type == NodeType::Table && node.qualified_name.as_deref() == Some("employees")
})
.collect();
assert_eq!(
employee_nodes.len(),
2,
"expected one node per self-join instance"
);
assert!(employee_nodes.iter().all(|node| node.name_spans.len() == 1));
assert_ne!(
employee_nodes[0].name_spans[0],
employee_nodes[1].name_spans[0]
);
for node in employee_nodes {
let span = node.name_spans[0];
assert_eq!(&sql[span.start..span.end], "employees");
}
}
#[test]
fn name_spans_distinguish_same_label_qualified_relations() {
let sql = "SELECT * FROM sales.orders so JOIN archive.orders ao ON so.id = ao.id";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let sales_orders = stmt
.nodes
.iter()
.find(|node| node.qualified_name.as_deref() == Some("sales.orders"))
.expect("sales.orders node");
let archive_orders = stmt
.nodes
.iter()
.find(|node| node.qualified_name.as_deref() == Some("archive.orders"))
.expect("archive.orders node");
assert_eq!(sales_orders.name_spans.len(), 1);
assert_eq!(archive_orders.name_spans.len(), 1);
assert_ne!(sales_orders.name_spans[0], archive_orders.name_spans[0]);
assert_eq!(
&sql[sales_orders.name_spans[0].start..sales_orders.name_spans[0].end],
"orders"
);
assert_eq!(
&sql[archive_orders.name_spans[0].start..archive_orders.name_spans[0].end],
"orders"
);
}
#[test]
fn name_spans_preserve_quoted_identifier_parts_with_embedded_dots() {
let sql = "SELECT * FROM \"my.schema\".\"my.table\"";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let table = stmt
.nodes
.iter()
.find(|node| node.node_type == NodeType::Table)
.expect("quoted table node");
assert_eq!(table.name_spans.len(), 1);
let span = table.name_spans[0];
assert_eq!(&sql[span.start..span.end], "my.table");
}
#[test]
fn cte_body_span_skips_optional_column_list() {
let sql =
"WITH metrics(user_id, total) AS (SELECT id, amount FROM orders) SELECT * FROM metrics";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let metrics = find_cte_node(&stmt, "metrics").expect("metrics cte node");
let body = metrics
.body_span
.expect("cte body span should be populated");
assert_eq!(
&sql[body.start..body.end],
"(SELECT id, amount FROM orders)"
);
}
#[test]
fn cte_body_span_skips_materialization_modifiers() {
let sql = "WITH metrics AS NOT MATERIALIZED (SELECT id FROM orders) SELECT * FROM metrics";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let metrics = find_cte_node(&stmt, "metrics").expect("metrics cte node");
let body = metrics
.body_span
.expect("cte body span should be populated");
assert_eq!(&sql[body.start..body.end], "(SELECT id FROM orders)");
}
#[test]
fn cte_body_span_skips_dollar_quoted_strings() {
let sql = "WITH metrics AS (SELECT $$)$$ AS x) SELECT * FROM metrics";
let result = run_analysis(sql, Dialect::Postgres, None);
let stmt = StmtView::new(&result, 0);
let metrics = find_cte_node(&stmt, "metrics").expect("metrics cte node");
let body = metrics
.body_span
.expect("cte body span should be populated");
assert_eq!(&sql[body.start..body.end], "(SELECT $$)$$ AS x)");
}
#[test]
fn oracle_insert_simple() {
let sql = load_sql_fixture("oracle", "01_insert_simple.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
assert!(
!result.summary.has_errors,
"oracle insert_simple should parse without errors: {:?}",
result.issues
);
let stmt = first_statement(&result);
let tables = collect_table_names(&result);
assert!(
tables.contains("CORE.REG_SUBJECT"),
"should find target table CORE.REG_SUBJECT"
);
assert!(
tables.contains("IDM.REG_SUBJECT"),
"should find source table IDM.REG_SUBJECT"
);
assert!(
tables.contains("IDM.REG_SUBJECTTYPE"),
"should find source table IDM.REG_SUBJECTTYPE"
);
assert!(!stmt.edges.is_empty(), "should have lineage edges");
}
#[test]
fn oracle_select_only() {
let sql = load_sql_fixture("oracle", "02_select_only.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
assert!(
!result.summary.has_errors,
"oracle select_only should parse without errors: {:?}",
result.issues
);
let tables = collect_table_names(&result);
assert!(
tables.contains("IDM.REG_SUBJECT"),
"should find source table IDM.REG_SUBJECT"
);
}
#[test]
fn oracle_update_alias() {
let sql = load_sql_fixture("oracle", "03_update_alias.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
assert!(
!result.summary.has_errors,
"oracle update_alias should parse without errors: {:?}",
result.issues
);
let tables = collect_table_names(&result);
assert!(
tables.contains("CORE.REG_SUBJECT"),
"should find target table CORE.REG_SUBJECT"
);
}
#[test]
fn oracle_delete_exists() {
let sql = load_sql_fixture("oracle", "04_delete_exists.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
assert!(
!result.summary.has_errors,
"oracle delete_exists should parse without errors: {:?}",
result.issues
);
let tables = collect_table_names(&result);
assert!(
tables.contains("CORE.REG_SUBJECT"),
"should find target table CORE.REG_SUBJECT"
);
}
#[test]
fn oracle_merge_minimal() {
let sql = load_sql_fixture("oracle", "05_merge_minimal.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
assert!(
!result.summary.has_errors,
"oracle merge_minimal should parse without errors: {:?}",
result.issues
);
let tables = collect_table_names(&result);
assert!(
tables.contains("CORE.REG_SUBJECT"),
"should find target table CORE.REG_SUBJECT"
);
assert!(
tables.contains("IDM.REG_SUBJECT"),
"should find source table IDM.REG_SUBJECT"
);
}
#[test]
fn oracle_view_simple() {
let sql = load_sql_fixture("oracle", "06_view_simple.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
assert!(
!result.summary.has_errors,
"oracle view_simple should parse without errors: {:?}",
result.issues
);
let stmt = first_statement(&result);
let tables = collect_table_names(&result);
assert!(
tables.contains("IDM.REG_SUBJECT"),
"should find source table IDM.REG_SUBJECT"
);
assert!(!stmt.edges.is_empty(), "should have lineage edges");
}
fn oracle_fixture_result(fixture: &str) -> (bool, usize, usize, String) {
let sql = load_sql_fixture("oracle", fixture);
let result = run_analysis(&sql, Dialect::Oracle, None);
let parse_ok = !result.summary.has_errors;
let tables = collect_table_names(&result);
let edge_count: usize = result
.statements
.iter()
.map(|s| result.edges_in_statement(s.statement_index).count())
.sum();
let error_detail = if !parse_ok {
result
.issues
.iter()
.map(|i| format!("{}: {}", i.code, i.message))
.collect::<Vec<_>>()
.join("; ")
} else {
String::new()
};
(parse_ok, tables.len(), edge_count, error_detail)
}
#[test]
fn oracle_bulk_delete_simple() {
let (ok, tables, _, err) = oracle_fixture_result("delete_simple.sql");
assert!(ok, "delete_simple parse failed: {err}");
assert!(tables >= 1, "should find at least 1 table");
}
#[test]
fn oracle_bulk_delete_in() {
let (ok, tables, _, err) = oracle_fixture_result("delete_in.sql");
assert!(ok, "delete_in parse failed: {err}");
assert!(tables >= 2, "should find at least 2 tables");
}
#[test]
fn oracle_bulk_delete_exists() {
let (ok, tables, _, err) = oracle_fixture_result("delete_exists.sql");
assert!(ok, "delete_exists parse failed: {err}");
assert!(tables >= 2, "should find at least 2 tables");
}
#[test]
fn oracle_bulk_insert_simple() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_simple.sql");
assert!(ok, "insert_simple parse failed: {err}");
assert!(tables >= 3, "should find target + 2 sources");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_values() {
let (ok, tables, _, err) = oracle_fixture_result("insert_values.sql");
assert!(ok, "insert_values parse failed: {err}");
assert!(tables >= 1, "should find target table");
}
#[test]
fn oracle_bulk_insert_select_literals() {
let (ok, tables, _, err) = oracle_fixture_result("insert_select_literals.sql");
assert!(ok, "insert_select_literals parse failed: {err}");
assert!(tables >= 1, "should find at least target table");
}
#[test]
fn oracle_bulk_insert_target_source() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_target_source.sql");
assert!(ok, "insert_target_source parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_cte_chain_deep() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_cte_chain_deep.sql");
assert!(ok, "insert_cte_chain_deep parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_cte_sum_case() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_cte_sum_case.sql");
assert!(ok, "insert_cte_sum_case parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_from_one_subquery() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_from_one_subquery.sql");
assert!(ok, "insert_from_one_subquery parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_into_subquery_target() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_into_subquery_target.sql");
assert!(ok, "insert_into_subquery_target parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_subquery_chain() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_subquery_chain.sql");
assert!(ok, "insert_subquery_chain parse failed: {err}");
assert!(tables >= 3, "should find target + 2 sources");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_union_all_sources() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_union_all_sources.sql");
assert!(ok, "insert_union_all_sources parse failed: {err}");
assert!(tables >= 3, "should find target + 2 source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_case_expr() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_case_expr.sql");
assert!(ok, "insert_case_expr parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_window_rn() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_window_rn.sql");
assert!(ok, "insert_window_rn parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_values_with_subquery() {
let (ok, tables, _, err) = oracle_fixture_result("insert_values_with_subquery.sql");
assert!(ok, "insert_values_with_subquery parse failed: {err}");
assert!(tables >= 1, "should find at least target table");
}
#[test]
fn oracle_bulk_insert_bare_star() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_bare_star.sql");
assert!(ok, "insert_bare_star parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_table_star() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_table_star.sql");
assert!(ok, "insert_table_star parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_from_subquery_star() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_from_subquery_star.sql");
assert!(ok, "insert_from_subquery_star parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_from_view() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_from_view.sql");
assert!(ok, "insert_from_view parse failed: {err}");
assert!(tables >= 2, "should find target + view source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_join_subquery_stars() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_join_subquery_stars.sql");
assert!(ok, "insert_join_subquery_stars parse failed: {err}");
assert!(tables >= 3, "should find target + 2 sources");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_cte_mixed() {
let (ok, tables, _, err) = oracle_fixture_result("insert_cte_mixed.sql");
assert!(ok, "insert_cte_mixed parse failed: {err}");
assert!(tables >= 3, "should find target + 2 CTE source tables");
}
#[test]
fn oracle_bulk_insert_table_star_join_meta() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_table_star_join_meta.sql");
assert!(ok, "insert_table_star_join_meta parse failed: {err}");
assert!(tables >= 3, "should find target + 2 sources");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_star_source() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_star_source.sql");
assert!(ok, "insert_star_source parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_no_target_fields() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_no_target_fields.sql");
assert!(ok, "insert_no_target_fields parse failed: {err}");
assert!(tables >= 3, "should find target + 2 sources");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_positional_no_aliases() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_positional_no_aliases_oracle.sql");
assert!(ok, "insert_positional_no_aliases parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_scalar_agg_multi_tables() {
let (ok, tables, edges, err) =
oracle_fixture_result("insert_scalar_agg_multi_tables_oracle.sql");
assert!(ok, "insert_scalar_agg_multi_tables parse failed: {err}");
assert!(tables >= 4, "should find target + multiple source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_star_minus() {
let (ok, tables, edges, err) = oracle_fixture_result("insert_star_minus_oracle.sql");
assert!(ok, "insert_star_minus parse failed: {err}");
assert!(tables >= 3, "should find target + 2 source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_insert_rms_fct_segment() {
let (ok, tables, _, err) = oracle_fixture_result("insert_rms_fct_segment.sql");
assert!(ok, "insert_rms_fct_segment parse failed: {err}");
assert!(tables >= 2, "should find target + sources");
}
#[test]
fn oracle_bulk_insert_bm_det_customer() {
let (ok, tables, _, err) = oracle_fixture_result("insert_bm_det_customer.sql");
assert!(ok, "insert_bm_det_customer parse failed: {err}");
assert!(tables >= 2, "should find target + sources");
}
#[test]
fn oracle_bulk_merge_minimal() {
let (ok, tables, edges, err) = oracle_fixture_result("merge_minimal.sql");
assert!(ok, "merge_minimal parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_merge_when_matched_only() {
let (ok, tables, edges, err) = oracle_fixture_result("merge_when_matched_only.sql");
assert!(ok, "merge_when_matched_only parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_merge_when_not_matched_only() {
let (ok, tables, edges, err) = oracle_fixture_result("merge_when_not_matched_only.sql");
assert!(ok, "merge_when_not_matched_only parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_merge_using_join() {
let (ok, tables, edges, err) = oracle_fixture_result("merge_using_join.sql");
assert!(ok, "merge_using_join parse failed: {err}");
assert!(tables >= 3, "should find target + 2 sources");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_merge_using_direct_table() {
let (ok, tables, edges, err) = oracle_fixture_result("merge_using_direct_table.sql");
assert!(ok, "merge_using_direct_table parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_merge_union_all() {
let (ok, tables, _, err) = oracle_fixture_result("merge_union_all.sql");
assert!(ok, "merge_union_all parse failed: {err}");
assert!(tables >= 3, "should find target + 2 source tables");
}
#[test]
fn oracle_bulk_merge_into_subquery_target() {
let (ok, tables, _, err) = oracle_fixture_result("merge_into_subquery_target.sql");
assert!(ok, "merge_into_subquery_target parse failed: {err}");
assert!(tables >= 2, "should find target + source");
}
#[test]
fn oracle_bulk_merge_using_star_subquery() {
let (ok, tables, edges, err) = oracle_fixture_result("merge_using_star_subquery.sql");
assert!(ok, "merge_using_star_subquery parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_merge_using_table_star() {
let (ok, tables, edges, err) = oracle_fixture_result("merge_using_table_star.sql");
assert!(ok, "merge_using_table_star parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_merge_using_star_join_meta() {
let (ok, tables, _, err) = oracle_fixture_result("merge_using_star_join_meta.sql");
assert!(ok, "merge_using_star_join_meta parse failed: {err}");
assert!(tables >= 3, "should find target + 2 sources");
}
#[test]
fn oracle_bulk_merge_using_cte_star_meta() {
let (ok, tables, _, err) = oracle_fixture_result("merge_using_cte_star_meta.sql");
assert!(ok, "merge_using_cte_star_meta parse failed: {err}");
assert!(tables >= 3, "should find target + 2 sources");
}
#[test]
fn oracle_bulk_merge_complex_unqualified() {
let (ok, tables, _, err) = oracle_fixture_result("merge_complex_unqualified_oracle.sql");
assert!(ok, "merge_complex_unqualified parse failed: {err}");
assert!(tables >= 4, "should find target + multiple sources");
}
#[test]
fn oracle_bulk_merge_rms_agg_overdue() {
let (ok, tables, _, err) = oracle_fixture_result("merge_rms_agg_overdue.sql");
assert!(ok, "merge_rms_agg_overdue parse failed: {err}");
assert!(tables >= 2, "should find target + sources");
}
#[test]
fn oracle_bulk_merge_rms_det_overdue() {
let (ok, tables, _, err) = oracle_fixture_result("merge_rms_det_overdue.sql");
assert!(ok, "merge_rms_det_overdue parse failed: {err}");
assert!(tables >= 2, "should find target + sources");
}
#[test]
fn oracle_bulk_select_only() {
let (ok, tables, _, err) = oracle_fixture_result("select_only.sql");
assert!(ok, "select_only parse failed: {err}");
assert!(tables >= 1, "should find source table");
}
#[test]
fn oracle_bulk_select_complex() {
let (ok, tables, _, err) = oracle_fixture_result("select_complex.sql");
assert!(ok, "select_complex parse failed: {err}");
assert!(tables >= 1, "should find source table");
}
#[test]
fn oracle_bulk_ctas_cte_star_using() {
let (ok, tables, _, err) = oracle_fixture_result("ctas_cte_star_using_oracle.sql");
assert!(ok, "ctas_cte_star_using parse failed: {err}");
assert!(tables >= 2, "should find target + sources");
}
#[test]
fn oracle_bulk_ctas_exists_star_subquery() {
let (ok, tables, _, err) = oracle_fixture_result("ctas_exists_star_subquery_oracle.sql");
assert!(ok, "ctas_exists_star_subquery parse failed: {err}");
assert!(tables >= 2, "should find target + sources");
}
#[test]
fn oracle_bulk_update_simple() {
let (ok, tables, _, err) = oracle_fixture_result("update_simple.sql");
assert!(ok, "update_simple parse failed: {err}");
assert!(tables >= 1, "should find target table");
}
#[test]
fn oracle_bulk_update_alias() {
let (ok, tables, _, err) = oracle_fixture_result("update_alias.sql");
assert!(ok, "update_alias parse failed: {err}");
assert!(tables >= 1, "should find target table");
}
#[test]
fn oracle_bulk_update_no_where() {
let (ok, tables, _, err) = oracle_fixture_result("update_no_where.sql");
assert!(ok, "update_no_where parse failed: {err}");
assert!(tables >= 1, "should find target table");
}
#[test]
fn oracle_bulk_update_case() {
let (ok, tables, _, err) = oracle_fixture_result("update_case.sql");
assert!(ok, "update_case parse failed: {err}");
assert!(tables >= 1, "should find target table");
}
#[test]
fn oracle_bulk_update_where_exists() {
let (ok, tables, _, err) = oracle_fixture_result("update_where_exists.sql");
assert!(ok, "update_where_exists parse failed: {err}");
assert!(tables >= 2, "should find target + source");
}
#[test]
fn oracle_bulk_update_one_subquery() {
let (ok, tables, edges, err) = oracle_fixture_result("update_one_subquery.sql");
assert!(ok, "update_one_subquery parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_update_subqueries() {
let (ok, tables, edges, err) = oracle_fixture_result("update_subqueries.sql");
assert!(ok, "update_subqueries parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_update_from_table_subquery() {
let (ok, tables, edges, err) = oracle_fixture_result("update_from_table_subquery.sql");
assert!(ok, "update_from_table_subquery parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_update_from_view() {
let (ok, tables, edges, err) = oracle_fixture_result("update_from_view.sql");
assert!(ok, "update_from_view parse failed: {err}");
assert!(tables >= 2, "should find target + view source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_update_join_correlated() {
let (ok, tables, edges, err) = oracle_fixture_result("update_join_correlated.sql");
assert!(ok, "update_join_correlated parse failed: {err}");
assert!(tables >= 3, "should find target + 2 source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_update_star_inner_subquery() {
let (ok, tables, edges, err) = oracle_fixture_result("update_star_inner_subquery.sql");
assert!(ok, "update_star_inner_subquery parse failed: {err}");
assert!(tables >= 2, "should find target + source");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_update_star_join_subquery_meta() {
let (ok, tables, _, err) = oracle_fixture_result("update_star_join_subquery_meta.sql");
assert!(ok, "update_star_join_subquery_meta parse failed: {err}");
assert!(tables >= 3, "should find target + 2 sources");
}
#[test]
fn oracle_bulk_update_correlated_subquery() {
let (ok, tables, _, err) = oracle_fixture_result("update_correlated_subquery_oracle.sql");
assert!(ok, "update_correlated_subquery parse failed: {err}");
assert!(tables >= 2, "should find target + source");
}
#[test]
fn oracle_bulk_view_simple() {
let (ok, tables, edges, err) = oracle_fixture_result("view_simple.sql");
assert!(ok, "view_simple parse failed: {err}");
assert!(tables >= 2, "should find source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_bare_star() {
let (ok, tables, edges, err) = oracle_fixture_result("view_bare_star.sql");
assert!(ok, "view_bare_star parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_table_star() {
let (ok, tables, edges, err) = oracle_fixture_result("view_table_star.sql");
assert!(ok, "view_table_star parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_column_list() {
let (ok, tables, edges, err) = oracle_fixture_result("view_column_list.sql");
assert!(ok, "view_column_list parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_or_replace() {
let (ok, tables, edges, err) = oracle_fixture_result("view_or_replace.sql");
assert!(ok, "view_or_replace parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_where_only() {
let (ok, tables, edges, err) = oracle_fixture_result("view_where_only.sql");
assert!(ok, "view_where_only parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_distinct() {
let (ok, tables, edges, err) = oracle_fixture_result("view_distinct.sql");
assert!(ok, "view_distinct parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_left_join() {
let (ok, tables, edges, err) = oracle_fixture_result("view_left_join.sql");
assert!(ok, "view_left_join parse failed: {err}");
assert!(tables >= 2, "should find 2 source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_group_by() {
let (ok, tables, edges, err) = oracle_fixture_result("view_group_by.sql");
assert!(ok, "view_group_by parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_having() {
let (ok, tables, edges, err) = oracle_fixture_result("view_having.sql");
assert!(ok, "view_having parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_expr() {
let (ok, tables, edges, err) = oracle_fixture_result("view_expr.sql");
assert!(ok, "view_expr parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_row_number() {
let (ok, tables, edges, err) = oracle_fixture_result("view_row_number.sql");
assert!(ok, "view_row_number parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_correlated_subquery() {
let (ok, tables, edges, err) = oracle_fixture_result("view_correlated_subquery.sql");
assert!(ok, "view_correlated_subquery parse failed: {err}");
assert!(tables >= 1, "should find at least main source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_subquery_chain_explicit() {
let (ok, tables, edges, err) = oracle_fixture_result("view_subquery_chain_explicit.sql");
assert!(ok, "view_subquery_chain_explicit parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_union_all() {
let (ok, tables, edges, err) = oracle_fixture_result("view_union_all.sql");
assert!(ok, "view_union_all parse failed: {err}");
assert!(tables >= 2, "should find 2 source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_union_all_diff() {
let (ok, tables, edges, err) = oracle_fixture_result("view_union_all_diff.sql");
assert!(ok, "view_union_all_diff parse failed: {err}");
assert!(tables >= 2, "should find 2 source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_cte_union() {
let (ok, tables, edges, err) = oracle_fixture_result("view_cte_union.sql");
assert!(ok, "view_cte_union parse failed: {err}");
assert!(tables >= 2, "should find 2 source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_literals_only() {
let (ok, _, _, err) = oracle_fixture_result("view_literals_only.sql");
assert!(ok, "view_literals_only parse failed: {err}");
}
#[test]
fn oracle_bulk_view_quoted() {
let (ok, tables, edges, err) = oracle_fixture_result("view_quoted.sql");
assert!(ok, "view_quoted parse failed: {err}");
assert!(tables >= 1, "should find source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_from_view() {
let (ok, tables, edges, err) = oracle_fixture_result("view_from_view.sql");
assert!(ok, "view_from_view parse failed: {err}");
assert!(tables >= 1, "should find source view");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_star_join() {
let (ok, tables, edges, err) = oracle_fixture_result("view_star_join.sql");
assert!(ok, "view_star_join parse failed: {err}");
assert!(tables >= 2, "should find 2 source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_join_subquery_stars() {
let (ok, tables, edges, err) = oracle_fixture_result("view_join_subquery_stars.sql");
assert!(ok, "view_join_subquery_stars parse failed: {err}");
assert!(tables >= 2, "should find 2 source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_scalar_subquery() {
let (ok, tables, edges, err) = oracle_fixture_result("view_scalar_subquery_oracle.sql");
assert!(ok, "view_scalar_subquery parse failed: {err}");
assert!(tables >= 1, "should find at least main source table");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_bulk_view_unqualified_cols() {
let (ok, tables, edges, err) = oracle_fixture_result("view_unqualified_cols_oracle.sql");
assert!(ok, "view_unqualified_cols parse failed: {err}");
assert!(tables >= 2, "should find 2 source tables");
assert!(edges > 0, "should have lineage edges");
}
#[test]
fn oracle_insert_with_cte_chain_recognises_ctes() {
let sql = load_sql_fixture("oracle", "insert_cte_chain_deep.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let ctes = collect_cte_names(&result);
assert!(ctes.contains("c1"), "c1 should be a CTE node: {ctes:?}");
assert!(ctes.contains("c2"), "c2 should be a CTE node: {ctes:?}");
let tables = collect_table_names(&result);
assert!(
tables.iter().any(|t| t.contains("REG_SUBJECT")),
"base table IDM.REG_SUBJECT should appear in lineage: {tables:?}"
);
let cols = column_labels(&stmt);
assert!(
cols.iter().any(|c| c == "ID_SUBJECT" || c == "id_subject"),
"should have id_subject column: {cols:?}"
);
}
#[test]
fn oracle_insert_with_cte_sum_case_recognises_cte() {
let sql = load_sql_fixture("oracle", "insert_cte_sum_case.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let ctes = collect_cte_names(&result);
assert!(ctes.contains("s"), "CTE 's' should be recognised: {ctes:?}");
let tables = collect_table_names(&result);
assert!(
tables.iter().any(|t| t.contains("REG_SUBJECT")),
"base table should appear: {tables:?}"
);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
!derivations.is_empty(),
"SUM(CASE...) should produce a derivation edge"
);
}
#[test]
fn oracle_insert_with_cte_mixed_recognises_ctes() {
let sql = load_sql_fixture("oracle", "insert_cte_mixed.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let ctes = collect_cte_names(&result);
assert!(
ctes.contains("temp"),
"CTE 'temp' should be recognised: {ctes:?}"
);
assert!(
ctes.contains("temp2"),
"CTE 'temp2' should be recognised: {ctes:?}"
);
let tables = collect_table_names(&result);
assert!(
tables.iter().any(|t| t.contains("REG_SUBJECT")),
"base table REG_SUBJECT should appear: {tables:?}"
);
assert!(
tables.iter().any(|t| t.contains("REG_SUBJECTTYPE")),
"base table REG_SUBJECTTYPE should appear: {tables:?}"
);
}
#[test]
fn oracle_sysdate_not_treated_as_column() {
let sql = load_sql_fixture("oracle", "insert_bm_det_customer.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let sysdate_issues: Vec<_> = result
.issues
.iter()
.filter(|i| i.message.to_lowercase().contains("sysdate"))
.collect();
assert!(
sysdate_issues.is_empty(),
"SYSDATE should not trigger unresolved reference issues: {:?}",
sysdate_issues
.iter()
.map(|i| &i.message)
.collect::<Vec<_>>()
);
}
#[test]
fn oracle_insert_simple_column_lineage() {
let sql = load_sql_fixture("oracle", "insert_simple.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let dataflow = edges_by_type(&stmt, EdgeType::DataFlow);
let col_flows: Vec<_> = dataflow
.iter()
.filter(|e| {
let from = stmt.nodes.iter().find(|n| n.id == e.from);
from.map(|n| n.node_type == NodeType::Column)
.unwrap_or(false)
})
.collect();
assert!(
col_flows.len() >= 2,
"should have at least 2 column-level dataflow edges (id_subject, id_subjecttype): found {}",
col_flows.len()
);
}
#[test]
fn oracle_insert_case_expr_has_derivation() {
let sql = load_sql_fixture("oracle", "insert_case_expr.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivations.iter().any(|e| e
.expression
.as_deref()
.map(|expr| expr.contains("CASE"))
.unwrap_or(false)),
"should have a derivation edge with CASE expression"
);
let cols = column_labels(&stmt);
assert!(
cols.iter().any(|c| c == "CODE_NORM"),
"derived column CODE_NORM should appear: {cols:?}"
);
}
#[test]
fn oracle_view_group_by_has_count_derivation() {
let sql = load_sql_fixture("oracle", "view_group_by.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivations.iter().any(|e| e
.expression
.as_deref()
.map(|expr| expr.contains("COUNT"))
.unwrap_or(false)),
"should have COUNT(*) derivation edge"
);
let cols = column_labels(&stmt);
assert!(
cols.iter().any(|c| c == "CNT"),
"aggregated column CNT should appear: {cols:?}"
);
}
#[test]
fn oracle_view_row_number_has_window_derivation() {
let sql = load_sql_fixture("oracle", "view_row_number.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivations.iter().any(|e| e
.expression
.as_deref()
.map(|expr| expr.contains("ROW_NUMBER"))
.unwrap_or(false)),
"should have ROW_NUMBER() OVER derivation edge"
);
}
#[test]
fn oracle_view_expr_has_function_derivations() {
let sql = load_sql_fixture("oracle", "view_expr.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivations.iter().any(|e| e
.expression
.as_deref()
.map(|expr| expr.contains("UPPER"))
.unwrap_or(false)),
"should have UPPER() derivation edge"
);
assert!(
derivations.iter().any(|e| e
.expression
.as_deref()
.map(|expr| expr.contains("COALESCE"))
.unwrap_or(false)),
"should have COALESCE() derivation edge"
);
}
#[test]
fn oracle_view_cte_union_traces_through_cte() {
let sql = load_sql_fixture("oracle", "view_cte_union.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let ctes = collect_cte_names(&result);
assert!(ctes.contains("u"), "CTE 'u' should be present: {ctes:?}");
let tables = collect_table_names(&result);
assert!(
tables.iter().any(|t| t.contains("REG_SUBJECT")),
"base table REG_SUBJECT visible through CTE: {tables:?}"
);
assert!(
tables.iter().any(|t| t.contains("REG_SUBJECTTYPE")),
"base table REG_SUBJECTTYPE visible through CTE: {tables:?}"
);
}
#[test]
fn oracle_merge_minimal_column_lineage() {
let sql = load_sql_fixture("oracle", "merge_minimal.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let ctes = collect_cte_names(&result);
assert!(
ctes.contains("src"),
"MERGE USING subquery should create 'src' CTE: {ctes:?}"
);
let cols = column_labels(&stmt);
assert!(
cols.iter().any(|c| c == "ID_SUBJECT" || c == "id_subject"),
"should have id_subject in column lineage: {cols:?}"
);
}
#[test]
fn oracle_select_complex_cte_lineage() {
let sql = load_sql_fixture("oracle", "select_complex.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let ctes = collect_cte_names(&result);
assert!(ctes.contains("a"), "CTE 'a' should be present");
assert!(ctes.contains("b"), "CTE 'b' should be present");
let stmt = first_statement(&result);
let derivations = edges_by_type(&stmt, EdgeType::Derivation);
assert!(
derivations.len() >= 2,
"should have derivations for COUNT(*), CASE, ROW_NUMBER: found {}",
derivations.len()
);
}
#[test]
fn oracle_view_column_list_renames_columns() {
let sql = load_sql_fixture("oracle", "view_column_list.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
assert!(
cols.iter().any(|c| c == "SID"),
"view column list should rename ID_SUBJECT to SID: {cols:?}"
);
assert!(
cols.iter().any(|c| c == "TYP"),
"view column list should rename ID_SUBJECTTYPE to TYP: {cols:?}"
);
assert!(
!cols.iter().any(|c| c == "ID_SUBJECT"),
"original column name ID_SUBJECT should be renamed: {cols:?}"
);
assert!(
!cols.iter().any(|c| c == "ID_SUBJECTTYPE"),
"original column name ID_SUBJECTTYPE should be renamed: {cols:?}"
);
}
#[test]
fn generic_view_column_list_renames_columns() {
let sql = "CREATE VIEW my_view (a, b) AS SELECT x, y FROM my_table;";
let result = run_analysis(sql, Dialect::Generic, None);
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
assert!(
cols.iter().any(|c| c == "a"),
"view column list should rename x to a: {cols:?}"
);
assert!(
cols.iter().any(|c| c == "b"),
"view column list should rename y to b: {cols:?}"
);
}
#[test]
fn oracle_merge_update_set_target_columns() {
let sql = load_sql_fixture("oracle", "merge_minimal.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
assert!(
cols.iter().any(|c| c == "ID_SUBJECTTYPE"),
"MERGE UPDATE SET target column should appear: {cols:?}"
);
assert!(
cols.iter().any(|c| c == "ID_SUBJECT"),
"MERGE ON/INSERT columns should appear: {cols:?}"
);
}
#[test]
fn oracle_merge_insert_target_columns() {
let sql = "MERGE INTO target_t dst
USING (SELECT id, name FROM source_t) src
ON (dst.id = src.id)
WHEN NOT MATCHED THEN INSERT (id, name) VALUES (src.id, src.name);";
let result = run_analysis(sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
assert!(
cols.iter().any(|c| c == "ID"),
"MERGE INSERT target column ID should appear: {cols:?}"
);
assert!(
cols.iter().any(|c| c == "NAME"),
"MERGE INSERT target column NAME should appear: {cols:?}"
);
}
#[test]
fn oracle_merge_direct_table_has_column_nodes() {
let sql = load_sql_fixture("oracle", "merge_using_direct_table.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
assert!(
cols.iter().any(|c| c == "ID_SUBJECTTYPE"),
"MERGE SET target column should appear: {cols:?}"
);
assert!(
cols.iter().any(|c| c == "ID_SUBJECT"),
"MERGE INSERT target column should appear: {cols:?}"
);
let ownership = edges_by_type(&stmt, EdgeType::Ownership);
assert!(
ownership.len() >= 2,
"should have ownership edges for target columns: found {}",
ownership.len()
);
}
#[test]
fn oracle_merge_matched_only_has_set_column_nodes() {
let sql = load_sql_fixture("oracle", "merge_when_matched_only.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
assert!(
!cols.is_empty(),
"MERGE matched-only should have column nodes from SET targets: {cols:?}"
);
let ownership = edges_by_type(&stmt, EdgeType::Ownership);
assert!(
!ownership.is_empty(),
"should have ownership edges for SET target columns"
);
}
#[test]
fn oracle_merge_not_matched_only_has_insert_column_nodes() {
let sql = load_sql_fixture("oracle", "merge_when_not_matched_only.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let stmt = first_statement(&result);
let cols = column_labels(&stmt);
assert!(
!cols.is_empty(),
"MERGE not-matched-only should have column nodes from INSERT targets: {cols:?}"
);
let ownership = edges_by_type(&stmt, EdgeType::Ownership);
assert!(
!ownership.is_empty(),
"should have ownership edges for INSERT target columns"
);
}
#[test]
fn oracle_insert_no_target_fields_with_schema() {
assert_oracle_no_unresolved_with_schema("insert_no_target_fields.sql");
}
#[test]
fn oracle_insert_positional_no_aliases_with_schema() {
assert_oracle_no_unresolved_with_schema("insert_positional_no_aliases_oracle.sql");
}
#[test]
fn oracle_insert_target_source_with_schema() {
assert_oracle_no_unresolved_with_schema("insert_target_source.sql");
}
fn oracle_schema() -> SchemaMetadata {
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join("schemas")
.join("oracle_sample.json");
let content =
fs::read_to_string(&path).unwrap_or_else(|e| panic!("failed to read {path:?}: {e}"));
serde_json::from_str(&content).unwrap_or_else(|e| panic!("failed to parse {path:?}: {e}"))
}
fn issue_codes_set(result: &AnalyzeResult) -> HashSet<String> {
result.issues.iter().map(|i| i.code.clone()).collect()
}
#[test]
fn oracle_star_without_schema_is_approximate() {
let sql = load_sql_fixture("oracle", "insert_bare_star.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
let codes = issue_codes_set(&result);
assert!(
codes.contains(issue_codes::APPROXIMATE_LINEAGE),
"bare star without schema should emit APPROXIMATE_LINEAGE, issues: {:?}",
result.issues
);
}
#[test]
fn oracle_star_with_schema_expands_insert_bare_star() {
let sql = load_sql_fixture("oracle", "insert_bare_star.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"with schema, bare star should expand — no APPROXIMATE_LINEAGE: {approx:?}"
);
let stmt = first_statement(&result);
let col_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Column)
.collect();
assert!(
col_nodes.len() >= 2,
"expanded star should produce column nodes: found {}",
col_nodes.len()
);
}
#[test]
fn oracle_star_with_schema_expands_insert_table_star() {
let sql = load_sql_fixture("oracle", "insert_table_star.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"with schema, table star should expand — no APPROXIMATE_LINEAGE: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_view_bare_star() {
let sql = load_sql_fixture("oracle", "view_bare_star.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"view bare star should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_view_table_star() {
let sql = load_sql_fixture("oracle", "view_table_star.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"view table star should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_view_star_join() {
let sql = load_sql_fixture("oracle", "view_star_join.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"view star join should expand with schema: {approx:?}"
);
let stmt = first_statement(&result);
let col_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Column)
.collect();
assert!(
col_nodes.len() >= 4,
"star over JOIN should expand columns from both tables: found {}",
col_nodes.len()
);
}
#[test]
fn oracle_star_with_schema_expands_merge_using_star() {
let sql = load_sql_fixture("oracle", "merge_using_star_subquery.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"merge using star should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_merge_table_star() {
let sql = load_sql_fixture("oracle", "merge_using_table_star.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"merge table star should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_insert_from_subquery_star() {
let sql = load_sql_fixture("oracle", "insert_from_subquery_star.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"insert from subquery star should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_insert_join_subquery_stars() {
let sql = load_sql_fixture("oracle", "insert_join_subquery_stars.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"insert join subquery stars should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_insert_star_minus() {
let sql = load_sql_fixture("oracle", "insert_star_minus_oracle.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"insert star minus should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_insert_table_star_join_meta() {
let sql = load_sql_fixture("oracle", "insert_table_star_join_meta.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"insert table star join should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_merge_star_join_meta() {
let sql = load_sql_fixture("oracle", "merge_using_star_join_meta.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"merge star join should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_merge_cte_star_meta() {
let sql = load_sql_fixture("oracle", "merge_using_cte_star_meta.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"merge cte star should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_update_star_inner_subquery() {
let sql = load_sql_fixture("oracle", "update_star_inner_subquery.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"update star inner subquery should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_update_star_join_subquery_meta() {
let sql = load_sql_fixture("oracle", "update_star_join_subquery_meta.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"update star join subquery should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_view_join_subquery_stars() {
let sql = load_sql_fixture("oracle", "view_join_subquery_stars.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"view join subquery stars should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_insert_star_source() {
let sql = load_sql_fixture("oracle", "insert_star_source.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"insert star source should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_star_with_schema_expands_ctas_cte_star() {
let sql = load_sql_fixture("oracle", "ctas_cte_star_using_oracle.sql");
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
let approx: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::APPROXIMATE_LINEAGE)
.collect();
assert!(
approx.is_empty(),
"CTAS CTE star should expand with schema: {approx:?}"
);
}
#[test]
fn oracle_update_simple_column_lineage() {
let sql = load_sql_fixture("oracle", "update_simple.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
assert!(
!result.summary.has_errors,
"should parse: {:?}",
result.issues
);
let stmt = first_statement(&result);
let col_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Column)
.collect();
assert!(
!col_nodes.is_empty(),
"UPDATE SET target should produce column nodes, got 0"
);
let col_names: Vec<&str> = col_nodes.iter().map(|n| n.label.as_ref()).collect();
assert!(
col_names.iter().any(|n| n.contains("CODE")),
"should have CODE column from SET target, found: {col_names:?}"
);
}
#[test]
fn oracle_update_case_has_column_lineage() {
let sql = load_sql_fixture("oracle", "update_case.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
assert!(
!result.summary.has_errors,
"should parse: {:?}",
result.issues
);
let stmt = first_statement(&result);
let col_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Column)
.collect();
assert!(
!col_nodes.is_empty(),
"UPDATE CASE should produce column nodes, got 0"
);
let col_names: Vec<&str> = col_nodes.iter().map(|n| n.label.as_ref()).collect();
assert!(
col_names.iter().any(|n| n.contains("CODE")),
"should have CODE column from SET target, found: {col_names:?}"
);
}
#[test]
fn oracle_update_alias_column_lineage() {
let sql = load_sql_fixture("oracle", "update_alias.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
assert!(
!result.summary.has_errors,
"should parse: {:?}",
result.issues
);
let stmt = first_statement(&result);
let col_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Column)
.collect();
assert!(
!col_nodes.is_empty(),
"UPDATE with alias SET target should produce column nodes, got 0"
);
}
#[test]
fn oracle_update_no_where_column_lineage() {
let sql = load_sql_fixture("oracle", "update_no_where.sql");
let result = run_analysis(&sql, Dialect::Oracle, None);
assert!(
!result.summary.has_errors,
"should parse: {:?}",
result.issues
);
let stmt = first_statement(&result);
let col_nodes: Vec<_> = stmt
.nodes
.iter()
.filter(|n| n.node_type == NodeType::Column)
.collect();
assert!(
!col_nodes.is_empty(),
"UPDATE without WHERE should produce column nodes, got 0"
);
}
fn assert_oracle_no_unresolved_with_schema(fixture: &str) {
let sql = load_sql_fixture("oracle", fixture);
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
assert!(
!result.summary.has_errors,
"should parse {fixture}: {:?}",
result.issues
);
let unresolved: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::UNRESOLVED_REFERENCE)
.collect();
assert!(
unresolved.is_empty(),
"{fixture}: with schema, unqualified cols should resolve — no UNRESOLVED_REFERENCE: {unresolved:?}"
);
}
#[test]
fn oracle_view_unqualified_cols_with_schema() {
assert_oracle_no_unresolved_with_schema("view_unqualified_cols_oracle.sql");
}
#[test]
fn oracle_insert_scalar_agg_with_schema() {
assert_oracle_no_unresolved_with_schema("insert_scalar_agg_multi_tables_oracle.sql");
}
#[test]
fn oracle_merge_complex_unqualified_with_schema() {
assert_oracle_no_unresolved_with_schema("merge_complex_unqualified_oracle.sql");
}
fn assert_oracle_dual_suppressed(fixture: &str) {
let sql = load_sql_fixture("oracle", fixture);
let schema = oracle_schema();
let result = run_analysis(&sql, Dialect::Oracle, Some(schema));
assert!(
!result.summary.has_errors,
"should parse {fixture}: {:?}",
result.issues
);
let tables = collect_table_names(&result);
let has_dual = tables.iter().any(|t| t.eq_ignore_ascii_case("DUAL"));
assert!(
!has_dual,
"{fixture}: DUAL should not appear in table nodes: {tables:?}"
);
let unresolved: Vec<_> = result
.issues
.iter()
.filter(|i| i.code == issue_codes::UNRESOLVED_REFERENCE)
.collect();
assert!(
unresolved.is_empty(),
"{fixture}: should have no UNRESOLVED_REFERENCE with DUAL: {unresolved:?}"
);
}
#[test]
fn oracle_dual_suppressed_in_insert_select_literals() {
assert_oracle_dual_suppressed("insert_select_literals.sql");
}
#[test]
fn oracle_dual_suppressed_in_view_literals_only() {
assert_oracle_dual_suppressed("view_literals_only.sql");
}
#[test]
fn oracle_merge_union_all_with_schema() {
assert_oracle_no_unresolved_with_schema("merge_union_all.sql");
}
#[test]
fn oracle_merge_into_subquery_target_with_schema() {
assert_oracle_no_unresolved_with_schema("merge_into_subquery_target.sql");
}
#[test]
fn oracle_update_from_view_with_schema() {
assert_oracle_no_unresolved_with_schema("update_from_view.sql");
}
#[test]
fn oracle_insert_from_view_with_schema() {
assert_oracle_no_unresolved_with_schema("insert_from_view.sql");
}
#[test]
fn oracle_view_from_view_with_schema() {
assert_oracle_no_unresolved_with_schema("view_from_view.sql");
}