use flowscope_core::analyze;
use flowscope_core::types::{
issue_codes, AnalysisOptions, AnalyzeRequest, Dialect, SchemaMetadata, Severity,
};
fn count_warnings<F>(result: &flowscope_core::types::AnalyzeResult, predicate: F) -> usize
where
F: Fn(&flowscope_core::types::Issue) -> bool,
{
result
.issues
.iter()
.filter(|issue| issue.severity == Severity::Warning && predicate(issue))
.count()
}
#[test]
fn test_alias_in_group_by_mysql_no_crash() {
let sql = "SELECT x + y AS sum FROM t GROUP BY sum";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Mysql,
source_name: Some("test_group_by_mysql".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
assert_eq!(result.statements.len(), 1);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX && issue.message.contains("GROUP BY")
});
assert_eq!(
alias_warnings, 0,
"MySQL should not warn about alias in GROUP BY: {:?}",
result.issues
);
}
#[test]
fn test_alias_in_group_by_postgres_no_crash() {
let sql = "SELECT x + y AS sum FROM t GROUP BY sum";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Postgres,
source_name: Some("test_group_by_postgres".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
assert_eq!(result.statements.len(), 1);
}
#[test]
fn test_alias_in_having_mysql_allowed() {
let sql = "SELECT COUNT(*) AS cnt FROM t GROUP BY x HAVING cnt > 5";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Mysql,
source_name: Some("test_having_mysql".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX && issue.message.contains("HAVING")
});
assert_eq!(
alias_warnings, 0,
"MySQL should not warn about alias in HAVING: {:?}",
result.issues
);
}
#[test]
fn test_alias_in_having_postgres_warned() {
let sql = "SELECT COUNT(*) AS cnt FROM t GROUP BY x HAVING cnt > 5";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Postgres,
source_name: Some("test_having_postgres".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX && issue.message.contains("HAVING")
});
assert_eq!(
alias_warnings, 1,
"PostgreSQL should warn about alias 'cnt' in HAVING: {:?}",
result.issues
);
let warning = result
.issues
.iter()
.find(|i| i.code == issue_codes::UNSUPPORTED_SYNTAX && i.message.contains("HAVING"))
.expect("Should have a HAVING warning");
assert!(
warning.message.contains("cnt"),
"Warning should mention the alias 'cnt': {}",
warning.message
);
}
#[test]
fn test_alias_in_having_snowflake_warned() {
let sql = "SELECT SUM(amount) AS total FROM orders GROUP BY customer_id HAVING total > 1000";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Snowflake,
source_name: Some("test_having_snowflake".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX && issue.message.contains("HAVING")
});
assert_eq!(
alias_warnings, 1,
"Snowflake should warn about alias 'total' in HAVING: {:?}",
result.issues
);
}
#[test]
fn test_alias_in_order_by_all_dialects_allowed() {
let sql = "SELECT x + y AS sum FROM t ORDER BY sum";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Postgres,
source_name: Some("test_order_by_postgres".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX && issue.message.contains("ORDER BY")
});
assert_eq!(
alias_warnings, 0,
"Postgres should not warn about alias in ORDER BY: {:?}",
result.issues
);
}
#[test]
fn test_lateral_column_alias_bigquery_allowed() {
let sql = "SELECT x + y AS sum, sum * 2 AS double_sum FROM t";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Bigquery,
source_name: Some("test_lateral_bigquery".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX
&& issue.message.contains("lateral column alias")
});
assert_eq!(
alias_warnings, 0,
"BigQuery should not warn about lateral column alias: {:?}",
result.issues
);
}
#[test]
fn test_lateral_column_alias_postgres_warned() {
let sql = "SELECT x + y AS sum, sum * 2 AS double_sum FROM t";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Postgres,
source_name: Some("test_lateral_postgres".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX
&& issue.message.contains("lateral column alias")
});
assert_eq!(
alias_warnings, 1,
"PostgreSQL should warn about lateral column alias 'sum': {:?}",
result.issues
);
let warning = result
.issues
.iter()
.find(|i| {
i.code == issue_codes::UNSUPPORTED_SYNTAX && i.message.contains("lateral column alias")
})
.expect("Should have a lateral column alias warning");
assert!(
warning.message.contains("sum"),
"Warning should mention the alias 'sum': {}",
warning.message
);
}
#[test]
fn test_lateral_column_alias_mysql_warned() {
let sql = "SELECT price * quantity AS total, total * tax_rate AS tax FROM orders";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Mysql,
source_name: Some("test_lateral_mysql".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX
&& issue.message.contains("lateral column alias")
});
assert_eq!(
alias_warnings, 1,
"MySQL should warn about lateral column alias 'total': {:?}",
result.issues
);
}
#[test]
fn test_lateral_column_alias_snowflake_allowed() {
let sql = "SELECT a + b AS sum, sum / 2 AS half FROM t";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Snowflake,
source_name: Some("test_lateral_snowflake".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX
&& issue.message.contains("lateral column alias")
});
assert_eq!(
alias_warnings, 0,
"Snowflake should not warn about lateral column alias: {:?}",
result.issues
);
}
#[test]
fn test_no_lateral_warning_for_first_item() {
let sql = "SELECT x AS a FROM t";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Postgres,
source_name: Some("test_no_lateral_first".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX
&& issue.message.contains("lateral column alias")
});
assert_eq!(
alias_warnings, 0,
"Should not warn for first SELECT item: {:?}",
result.issues
);
}
#[test]
fn test_multiple_lateral_violations() {
let sql = "SELECT a AS x, x AS y, y AS z FROM t";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Postgres,
source_name: Some("test_multiple_lateral".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let alias_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX
&& issue.message.contains("lateral column alias")
});
assert_eq!(
alias_warnings, 2,
"Should have warnings for both 'x' and 'y': {:?}",
result.issues
);
}
#[test]
fn test_alias_shadowing_in_subquery() {
let sql = "
SELECT a.id
FROM t1 AS a
WHERE EXISTS (
SELECT 1 FROM t2 AS a WHERE a.id = 10 -- Inner 'a' is t2
)
AND a.id = 20 -- Outer 'a' should be t1
";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Postgres,
source_name: Some("test_scoping".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let t1_nodes: Vec<_> = result
.nodes_in_statement(0)
.filter(|n| &*n.label == "t1")
.collect();
let t2_nodes: Vec<_> = result
.nodes_in_statement(0)
.filter(|n| &*n.label == "t2")
.collect();
assert!(!t1_nodes.is_empty(), "t1 should be present");
assert!(!t2_nodes.is_empty(), "t2 should be present");
assert!(
result.issues.is_empty(),
"Should be no analysis issues: {:?}",
result.issues
);
let edges: Vec<_> = result.edges_in_statement(0).collect();
let t1_id = &t1_nodes[0].id;
let t1_cols: Vec<_> = edges
.iter()
.filter(|e| e.from == *t1_id && e.edge_type == flowscope_core::types::EdgeType::Ownership)
.map(|e| &e.to)
.collect();
assert!(!t1_cols.is_empty(), "t1 should have columns");
let flows_from_t1 = edges.iter().any(|e| {
t1_cols.contains(&&e.from) && e.edge_type == flowscope_core::types::EdgeType::DataFlow
});
assert!(flows_from_t1, "Output should flow from t1");
let t2_id = &t2_nodes[0].id;
let t2_cols: Vec<_> = edges
.iter()
.filter(|e| e.from == *t2_id && e.edge_type == flowscope_core::types::EdgeType::Ownership)
.map(|e| &e.to)
.collect();
let flows_from_t2_data = edges.iter().any(|e| {
t2_cols.contains(&&e.from) && e.edge_type == flowscope_core::types::EdgeType::DataFlow
});
assert!(
!flows_from_t2_data,
"Output should NOT flow from t2 (it is only in WHERE clause)"
);
}
#[test]
fn new_tables_are_known_when_implied_schema_disabled() {
let sql = "
CREATE TABLE foo (id INT);
SELECT * FROM foo;
";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Postgres,
source_name: Some("test_implied_disabled".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: Some(SchemaMetadata {
default_schema: Some("public".to_string()),
allow_implied: false,
..Default::default()
}),
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
assert_eq!(result.statements.len(), 2, "Expected CREATE + SELECT");
assert!(
result
.issues
.iter()
.all(|issue| issue.code != issue_codes::UNRESOLVED_REFERENCE),
"Should not warn about unresolved tables: {:?}",
result.issues
);
let select_tables: Vec<_> = result
.nodes_in_statement(1)
.filter(|n| n.node_type == flowscope_core::types::NodeType::Table)
.collect();
assert_eq!(select_tables.len(), 1, "SELECT should reference foo once");
assert_eq!(&*select_tables[0].label, "foo");
assert!(
select_tables[0]
.metadata
.as_ref()
.is_none_or(|m| !m.contains_key("placeholder")),
"Table node should not be marked as placeholder"
);
}
#[test]
fn missing_table_warned_when_other_tables_known() {
let sql = "
CREATE TABLE foo AS SELECT 1 as id;
SELECT * FROM missing_table;
";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Postgres,
source_name: Some("test_missing_warned".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: Some(SchemaMetadata {
default_schema: Some("public".to_string()),
allow_implied: false,
..Default::default()
}),
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
assert_eq!(result.statements.len(), 2, "Expected CREATE + SELECT");
let unresolved_warnings: Vec<_> = result
.issues
.iter()
.filter(|issue| {
issue.code == issue_codes::UNRESOLVED_REFERENCE && issue.severity == Severity::Warning
})
.collect();
assert_eq!(
unresolved_warnings.len(),
1,
"Should have exactly one unresolved reference warning: {:?}",
result.issues
);
assert!(
unresolved_warnings[0].message.contains("missing_table")
|| unresolved_warnings[0]
.message
.contains("public.missing_table"),
"Warning should mention missing_table: {:?}",
unresolved_warnings[0]
);
}
#[test]
fn test_lateral_column_alias_lineage_bigquery() {
let sql = "SELECT a + 1 AS b, b + 1 AS c FROM t";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Bigquery,
source_name: Some("test_lateral_lineage_bigquery".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
template_config: None,
};
let result = analyze(&request);
let lateral_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX
&& issue.message.contains("lateral column alias")
});
assert_eq!(
lateral_warnings, 0,
"BigQuery should not warn about lateral aliases: {:?}",
result.issues
);
let col_b = result
.nodes_in_statement(0)
.find(|n| n.node_type == flowscope_core::types::NodeType::Column && &*n.label == "b");
let col_c = result
.nodes_in_statement(0)
.find(|n| n.node_type == flowscope_core::types::NodeType::Column && &*n.label == "c");
let col_a = result
.nodes_in_statement(0)
.find(|n| n.node_type == flowscope_core::types::NodeType::Column && &*n.label == "a");
assert!(col_b.is_some(), "Column 'b' should exist");
assert!(col_c.is_some(), "Column 'c' should exist");
assert!(col_a.is_some(), "Column 'a' should exist (source from t)");
let col_a = col_a.unwrap();
let _col_b = col_b.unwrap();
let col_c = col_c.unwrap();
let edges: Vec<_> = result.edges_in_statement(0).collect();
let c_derives_from_a = edges.iter().any(|e| {
e.from == col_a.id
&& e.to == col_c.id
&& e.edge_type == flowscope_core::types::EdgeType::Derivation
});
assert!(
c_derives_from_a,
"Column 'c' should derive from 'a' (via lateral alias 'b'). Edges: {:?}",
edges
);
}
#[test]
fn test_lateral_column_alias_lineage_snowflake() {
let sql = "SELECT x AS first, first * 2 AS doubled FROM data";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Snowflake,
source_name: Some("test_lateral_lineage_snowflake".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
template_config: None,
};
let result = analyze(&request);
let lateral_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX
&& issue.message.contains("lateral column alias")
});
assert_eq!(lateral_warnings, 0, "Snowflake supports lateral aliases");
let col_first = result
.nodes_in_statement(0)
.find(|n| n.node_type == flowscope_core::types::NodeType::Column && &*n.label == "FIRST");
let col_doubled = result
.nodes_in_statement(0)
.find(|n| n.node_type == flowscope_core::types::NodeType::Column && &*n.label == "DOUBLED");
let col_x = result.nodes_in_statement(0).find(|n| {
n.node_type == flowscope_core::types::NodeType::Column
&& (n.label.eq_ignore_ascii_case("x"))
});
assert!(col_first.is_some(), "Column 'FIRST' should exist");
assert!(col_doubled.is_some(), "Column 'DOUBLED' should exist");
assert!(col_x.is_some(), "Column 'x' or 'X' should exist");
let col_x = col_x.unwrap();
let col_doubled = col_doubled.unwrap();
let edges: Vec<_> = result.edges_in_statement(0).collect();
let doubled_derives_from_x = edges.iter().any(|e| {
e.from == col_x.id
&& e.to == col_doubled.id
&& e.edge_type == flowscope_core::types::EdgeType::Derivation
});
assert!(
doubled_derives_from_x,
"Column 'DOUBLED' should derive from 'X' (via lateral alias 'FIRST'). Edges: {:?}",
edges
);
}
#[test]
fn test_lateral_column_alias_no_lineage_postgres() {
let sql = "SELECT a + 1 AS b, b + 1 AS c FROM t";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Postgres,
source_name: Some("test_lateral_no_lineage_postgres".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
template_config: None,
};
let result = analyze(&request);
let lateral_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX
&& issue.message.contains("lateral column alias")
});
assert_eq!(
lateral_warnings, 1,
"PostgreSQL should warn about lateral alias 'b': {:?}",
result.issues
);
let col_c = result
.nodes_in_statement(0)
.find(|n| n.node_type == flowscope_core::types::NodeType::Column && &*n.label == "c");
let col_a = result
.nodes_in_statement(0)
.find(|n| n.node_type == flowscope_core::types::NodeType::Column && &*n.label == "a");
assert!(col_c.is_some(), "Column 'c' should exist");
assert!(col_a.is_some(), "Column 'a' should exist");
let col_a = col_a.unwrap();
let col_c = col_c.unwrap();
let edges: Vec<_> = result.edges_in_statement(0).collect();
let c_derives_from_a = edges.iter().any(|e| {
e.from == col_a.id
&& e.to == col_c.id
&& (e.edge_type == flowscope_core::types::EdgeType::Derivation
|| e.edge_type == flowscope_core::types::EdgeType::DataFlow)
});
assert!(
!c_derives_from_a,
"In PostgreSQL, 'c' should NOT derive from 'a' (lateral alias not resolved). Edges: {:?}",
edges
);
}
#[test]
fn test_lateral_column_alias_chain_lineage() {
let sql = "SELECT a AS x, x + 1 AS y, y + 1 AS z FROM t";
let request = AnalyzeRequest {
sql: sql.to_string(),
files: None,
dialect: Dialect::Bigquery,
source_name: Some("test_lateral_chain".to_string()),
options: Some(AnalysisOptions {
enable_column_lineage: Some(true),
..Default::default()
}),
schema: None,
template_config: None,
};
let result = analyze(&request);
let lateral_warnings = count_warnings(&result, |issue| {
issue.code == issue_codes::UNSUPPORTED_SYNTAX
&& issue.message.contains("lateral column alias")
});
assert_eq!(lateral_warnings, 0, "BigQuery supports lateral aliases");
let col_a = result
.nodes_in_statement(0)
.find(|n| n.node_type == flowscope_core::types::NodeType::Column && &*n.label == "a");
let col_z = result
.nodes_in_statement(0)
.find(|n| n.node_type == flowscope_core::types::NodeType::Column && &*n.label == "z");
assert!(col_a.is_some(), "Column 'a' should exist");
assert!(col_z.is_some(), "Column 'z' should exist");
let col_a = col_a.unwrap();
let col_z = col_z.unwrap();
let edges: Vec<_> = result.edges_in_statement(0).collect();
let z_derives_from_a = edges.iter().any(|e| {
e.from == col_a.id
&& e.to == col_z.id
&& e.edge_type == flowscope_core::types::EdgeType::Derivation
});
assert!(
z_derives_from_a,
"Column 'z' should derive from 'a' (via chain x -> y -> z). Edges: {:?}",
edges
);
}