use super::*;
#[test]
fn test_column_impact_direct_dependent() {
let manifest = make_cross_model_manifest();
let result = compute_column_impact(
&manifest,
"stg_orders",
"order_id",
DialectType::Generic,
&mut ColumnLineageCache::disabled(),
);
assert!(result.errors.is_empty(), "errors: {:?}", result.errors);
assert!(
result
.impacted_columns
.iter()
.any(|ic| ic.model == "orders" && ic.column == "order_id"),
"orders.order_id should be impacted, got: {:?}",
result.impacted_columns
);
}
#[test]
fn test_column_impact_two_hops() {
let manifest = make_cross_model_manifest();
let result = compute_column_impact(
&manifest,
"stg_orders",
"customer_id",
DialectType::Generic,
&mut ColumnLineageCache::disabled(),
);
assert!(result.errors.is_empty(), "errors: {:?}", result.errors);
assert!(
result
.impacted_columns
.iter()
.any(|ic| ic.model == "orders" && ic.column == "customer_id"),
"orders.customer_id should be impacted, got: {:?}",
result.impacted_columns
);
assert!(
result
.impacted_columns
.iter()
.any(|ic| ic.model == "customers" && ic.column == "customer_id"),
"customers.customer_id should be impacted, got: {:?}",
result.impacted_columns
);
}
#[test]
fn test_column_impact_model_path() {
let manifest = make_cross_model_manifest();
let result = compute_column_impact(
&manifest,
"stg_orders",
"customer_id",
DialectType::Generic,
&mut ColumnLineageCache::disabled(),
);
let cust = result
.impacted_columns
.iter()
.find(|ic| ic.model == "customers" && ic.column == "customer_id")
.unwrap();
assert!(
cust.model_path.iter().any(|(m, _, _)| m == "orders"),
"model_path should include orders, got: {:?}",
cust.model_path
);
}
#[test]
fn test_column_impact_no_dependents() {
let manifest = make_cross_model_manifest();
let result = compute_column_impact(
&manifest,
"customers",
"customer_id",
DialectType::Generic,
&mut ColumnLineageCache::disabled(),
);
assert!(result.errors.is_empty(), "errors: {:?}", result.errors);
assert!(
result.impacted_columns.is_empty(),
"leaf model should have no impacted columns, got: {:?}",
result.impacted_columns
);
}
#[test]
fn test_column_impact_model_not_found() {
let manifest = make_cross_model_manifest();
let result = compute_column_impact(
&manifest,
"nonexistent",
"col",
DialectType::Generic,
&mut ColumnLineageCache::disabled(),
);
assert!(!result.errors.is_empty());
assert!(result.errors[0].what.contains("not found"));
}
#[test]
fn test_column_impact_json_serialization() {
let manifest = make_cross_model_manifest();
let result = compute_column_impact(
&manifest,
"stg_orders",
"order_id",
DialectType::Generic,
&mut ColumnLineageCache::disabled(),
);
let json = serde_json::to_string_pretty(&result).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["model"], "stg_orders");
assert_eq!(parsed["column"], "order_id");
assert!(parsed["impacted_columns"].is_array());
let first = &parsed["impacted_columns"][0];
assert!(
first["unique_id"].is_string(),
"unique_id should be serialized in impacted_columns"
);
}
#[test]
fn test_column_impact_diamond_different_columns_through_shared_model() {
let manifest = make_diamond_manifest();
let impact_x = compute_column_impact(
&manifest,
"raw_data",
"x",
DialectType::Generic,
&mut ColumnLineageCache::disabled(),
);
assert!(impact_x.errors.is_empty(), "errors: {:?}", impact_x.errors);
let impacted_names: Vec<(&str, &str)> = impact_x
.impacted_columns
.iter()
.map(|ic| (ic.model.as_str(), ic.column.as_str()))
.collect();
assert!(
impacted_names.contains(&("shared", "x")),
"x should impact shared.x, got: {:?}",
impacted_names
);
assert!(
impacted_names.contains(&("left_model", "x")),
"x should impact left_model.x, got: {:?}",
impacted_names
);
assert!(
!impacted_names.contains(&("right_model", "y")),
"x should not impact right_model.y"
);
let impact_y = compute_column_impact(
&manifest,
"raw_data",
"y",
DialectType::Generic,
&mut ColumnLineageCache::disabled(),
);
assert!(impact_y.errors.is_empty(), "errors: {:?}", impact_y.errors);
let impacted_names_y: Vec<(&str, &str)> = impact_y
.impacted_columns
.iter()
.map(|ic| (ic.model.as_str(), ic.column.as_str()))
.collect();
assert!(
impacted_names_y.contains(&("shared", "y")),
"y should impact shared.y, got: {:?}",
impacted_names_y
);
assert!(
impacted_names_y.contains(&("right_model", "y")),
"y should impact right_model.y, got: {:?}",
impacted_names_y
);
assert!(
!impacted_names_y.contains(&("left_model", "x")),
"y should not impact left_model.x"
);
}
#[test]
fn test_build_downstream_model_map() {
let manifest = make_cross_model_manifest();
let map = build_downstream_model_map(&manifest);
assert!(
map.get("model.proj.stg_orders").map_or(false, |deps| deps
.contains(&"model.proj.orders".to_string())),
"stg_orders should have orders as downstream, got: {:?}",
map.get("model.proj.stg_orders")
);
assert!(
map.get("model.proj.orders").map_or(false, |deps| deps
.contains(&"model.proj.customers".to_string())),
"orders should have customers as downstream, got: {:?}",
map.get("model.proj.orders")
);
assert!(
map.get("model.proj.customers").is_none(),
"customers should have no downstream"
);
}
#[test]
fn test_column_impact_reconverging_dag_multi_path() {
let manifest = make_reconverging_manifest();
let result = compute_column_impact(
&manifest,
"source_model",
"x",
DialectType::Generic,
&mut ColumnLineageCache::disabled(),
);
let final_entries: Vec<_> = result
.impacted_columns
.iter()
.filter(|ic| ic.model == "final_model" && ic.column == "x")
.collect();
assert!(
final_entries.len() >= 2,
"final_model.x should appear for each upstream path (left and right), \
got {} entries. impacted: {:?}",
final_entries.len(),
result.impacted_columns
);
let mart_entries: Vec<_> = result
.impacted_columns
.iter()
.filter(|ic| ic.model == "mart_model" && ic.column == "x")
.collect();
assert!(
mart_entries.len() >= 2,
"mart_model.x should appear for each upstream path, got {} entries. impacted: {:?}",
mart_entries.len(),
result.impacted_columns
);
assert!(
mart_entries[0].model_path != mart_entries[1].model_path,
"mart_model.x entries should have distinct model_paths, both are: {:?}",
mart_entries[0].model_path
);
let has_left = mart_entries
.iter()
.any(|ic| ic.model_path.iter().any(|(m, _, _)| m == "left_model"));
let has_right = mart_entries
.iter()
.any(|ic| ic.model_path.iter().any(|(m, _, _)| m == "right_model"));
assert!(
has_left,
"one mart_model.x path should pass through left_model, paths: {:?}",
mart_entries
.iter()
.map(|ic| &ic.model_path)
.collect::<Vec<_>>()
);
assert!(
has_right,
"one mart_model.x path should pass through right_model, paths: {:?}",
mart_entries
.iter()
.map(|ic| &ic.model_path)
.collect::<Vec<_>>()
);
let dashboard_entries: Vec<_> = result
.impacted_columns
.iter()
.filter(|ic| ic.model == "dashboard_model" && ic.column == "x")
.collect();
assert!(
dashboard_entries.len() >= 2,
"dashboard_model.x should appear for each upstream path, got {} entries. impacted: {:?}",
dashboard_entries.len(),
result.impacted_columns
);
assert!(
dashboard_entries[0].model_path != dashboard_entries[1].model_path,
"dashboard_model.x entries should have distinct model_paths, both are: {:?}",
dashboard_entries[0].model_path
);
let dashboard_has_left = dashboard_entries
.iter()
.any(|ic| ic.model_path.iter().any(|(m, _, _)| m == "left_model"));
let dashboard_has_right = dashboard_entries
.iter()
.any(|ic| ic.model_path.iter().any(|(m, _, _)| m == "right_model"));
assert!(
dashboard_has_left,
"one dashboard_model.x path should pass through left_model, paths: {:?}",
dashboard_entries
.iter()
.map(|ic| &ic.model_path)
.collect::<Vec<_>>()
);
assert!(
dashboard_has_right,
"one dashboard_model.x path should pass through right_model, paths: {:?}",
dashboard_entries
.iter()
.map(|ic| &ic.model_path)
.collect::<Vec<_>>()
);
}