use std::collections::HashMap;
use super::cache::{CACHE_DIR, COLUMN_LINEAGE_CACHE_FILENAME, ColumnLineageCacheFile};
use super::cross_model::normalize_table_name;
use super::impact::build_downstream_model_map;
use super::single_model::format_lineage_error;
use super::*;
use crate::parser::manifest::{
DependsOn, ManifestColumn, ManifestConfig, ManifestNode, ManifestSource,
};
use polyglot_sql::Schema;
fn make_test_manifest() -> Manifest {
let mut nodes = HashMap::new();
let mut stg_orders_cols = HashMap::new();
for name in ["order_id", "customer_id", "order_date", "status"] {
stg_orders_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
nodes.insert(
"model.proj.stg_orders".to_string(),
ManifestNode {
unique_id: "model.proj.stg_orders".to_string(),
name: "stg_orders".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec!["source.proj.raw.orders".to_string()],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: stg_orders_cols,
compiled_code: Some(
"select id as order_id, user_id as customer_id, order_date, status from raw.orders"
.to_string(),
),
database: None,
schema: None,
},
);
let mut orders_cols = HashMap::new();
for name in ["order_id", "customer_id", "total_amount"] {
orders_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
nodes.insert("model.proj.orders".to_string(), ManifestNode {
unique_id: "model.proj.orders".to_string(),
name: "orders".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn { nodes: vec![
"model.proj.stg_orders".to_string(),
"model.proj.stg_payments".to_string(),
] },
config: ManifestConfig::default(),
description: None,
path: None,
columns: orders_cols,
compiled_code: Some("select o.order_id, o.customer_id, p.amount as total_amount from stg_orders o left join stg_payments p on o.order_id = p.order_id".to_string()),
database: None,
schema: None,
});
let mut stg_payments_cols = HashMap::new();
for name in ["payment_id", "order_id", "amount", "payment_method"] {
stg_payments_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
nodes.insert(
"model.proj.stg_payments".to_string(),
ManifestNode {
unique_id: "model.proj.stg_payments".to_string(),
name: "stg_payments".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn { nodes: vec![] },
config: ManifestConfig::default(),
description: None,
path: None,
columns: stg_payments_cols,
compiled_code: Some(
"select id as payment_id, order_id, amount, payment_method from raw.payments"
.to_string(),
),
database: None,
schema: None,
},
);
let mut source_cols = HashMap::new();
for name in ["id", "user_id", "order_date", "status"] {
source_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
let mut sources = HashMap::new();
sources.insert(
"source.proj.raw.orders".to_string(),
ManifestSource {
unique_id: "source.proj.raw.orders".to_string(),
name: "orders".to_string(),
source_name: "raw".to_string(),
resource_type: "source".to_string(),
description: None,
path: None,
columns: source_cols,
database: None,
schema: None,
identifier: None,
},
);
Manifest {
nodes,
sources,
exposures: HashMap::new(),
}
}
fn make_cross_model_manifest() -> Manifest {
let mut nodes = HashMap::new();
let mut raw_orders_cols = HashMap::new();
for name in ["id", "user_id", "order_date", "status"] {
raw_orders_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
let mut sources = HashMap::new();
sources.insert(
"source.proj.raw.orders".to_string(),
ManifestSource {
unique_id: "source.proj.raw.orders".to_string(),
name: "orders".to_string(),
source_name: "raw".to_string(),
resource_type: "source".to_string(),
description: None,
path: None,
columns: raw_orders_cols,
database: None,
schema: None,
identifier: None,
},
);
let mut raw_payments_cols = HashMap::new();
for name in ["id", "order_id", "amount", "payment_method"] {
raw_payments_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
sources.insert(
"source.proj.raw.payments".to_string(),
ManifestSource {
unique_id: "source.proj.raw.payments".to_string(),
name: "payments".to_string(),
source_name: "raw".to_string(),
resource_type: "source".to_string(),
description: None,
path: None,
columns: raw_payments_cols,
database: None,
schema: None,
identifier: None,
},
);
let mut stg_orders_cols = HashMap::new();
for name in ["order_id", "customer_id", "order_date", "status"] {
stg_orders_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
nodes.insert(
"model.proj.stg_orders".to_string(),
ManifestNode {
unique_id: "model.proj.stg_orders".to_string(),
name: "stg_orders".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec!["source.proj.raw.orders".to_string()],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: stg_orders_cols,
compiled_code: Some(
"select id as order_id, user_id as customer_id, order_date, status from orders"
.to_string(),
),
database: None,
schema: None,
},
);
let mut stg_payments_cols = HashMap::new();
for name in ["payment_id", "order_id", "amount", "payment_method"] {
stg_payments_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
nodes.insert(
"model.proj.stg_payments".to_string(),
ManifestNode {
unique_id: "model.proj.stg_payments".to_string(),
name: "stg_payments".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec!["source.proj.raw.payments".to_string()],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: stg_payments_cols,
compiled_code: Some(
"select id as payment_id, order_id, amount, payment_method from payments"
.to_string(),
),
database: None,
schema: None,
},
);
let mut orders_cols = HashMap::new();
for name in ["order_id", "customer_id", "total_amount"] {
orders_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
nodes.insert(
"model.proj.orders".to_string(),
ManifestNode {
unique_id: "model.proj.orders".to_string(),
name: "orders".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec![
"model.proj.stg_orders".to_string(),
"model.proj.stg_payments".to_string(),
],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: orders_cols,
compiled_code: Some(
concat!(
"with stg_orders as (select * from stg_orders), ",
"stg_payments as (select * from stg_payments) ",
"select stg_orders.order_id, stg_orders.customer_id, ",
"stg_payments.amount as total_amount ",
"from stg_orders left join stg_payments ",
"on stg_orders.order_id = stg_payments.order_id"
)
.to_string(),
),
database: None,
schema: None,
},
);
let mut customers_cols = HashMap::new();
for name in ["customer_id", "order_count"] {
customers_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
nodes.insert(
"model.proj.customers".to_string(),
ManifestNode {
unique_id: "model.proj.customers".to_string(),
name: "customers".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec!["model.proj.orders".to_string()],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: customers_cols,
compiled_code: Some(
concat!(
"with orders as (select * from orders) ",
"select customer_id, count(*) as order_count from orders group by customer_id"
)
.to_string(),
),
database: None,
schema: None,
},
);
Manifest {
nodes,
sources,
exposures: HashMap::new(),
}
}
fn make_duplicate_name_manifest() -> Manifest {
let mut nodes = HashMap::new();
nodes.insert(
"model.pkg_a.stg_orders".to_string(),
ManifestNode {
unique_id: "model.pkg_a.stg_orders".to_string(),
name: "stg_orders".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn { nodes: vec![] },
config: ManifestConfig::default(),
description: None,
path: None,
columns: {
let mut cols = HashMap::new();
cols.insert(
"customer_id".to_string(),
ManifestColumn {
name: "customer_id".to_string(),
},
);
cols
},
compiled_code: Some("select customer_id from raw_customers".to_string()),
database: None,
schema: None,
},
);
nodes.insert(
"model.pkg_a.customers".to_string(),
ManifestNode {
unique_id: "model.pkg_a.customers".to_string(),
name: "customers".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec!["model.pkg_a.stg_orders".to_string()],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: {
let mut cols = HashMap::new();
cols.insert(
"customer_id".to_string(),
ManifestColumn {
name: "customer_id".to_string(),
},
);
cols
},
compiled_code: Some("select customer_id from stg_orders".to_string()),
database: None,
schema: None,
},
);
nodes.insert(
"model.pkg_b.customers".to_string(),
ManifestNode {
unique_id: "model.pkg_b.customers".to_string(),
name: "customers".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec!["model.pkg_a.stg_orders".to_string()],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: {
let mut cols = HashMap::new();
cols.insert(
"customer_id".to_string(),
ManifestColumn {
name: "customer_id".to_string(),
},
);
cols
},
compiled_code: Some("select customer_id from stg_orders".to_string()),
database: None,
schema: None,
},
);
Manifest {
nodes,
sources: HashMap::new(),
exposures: HashMap::new(),
}
}
#[test]
fn test_column_impact_duplicate_model_names_across_packages() {
let manifest = make_duplicate_name_manifest();
let result = compute_column_impact(
&manifest,
"stg_orders",
"customer_id",
DialectType::Generic,
&mut ColumnLineageCache::disabled(),
);
assert!(result.errors.is_empty(), "errors: {:?}", result.errors);
let unique_ids: Vec<&str> = result
.impacted_columns
.iter()
.filter(|ic| ic.column == "customer_id")
.map(|ic| ic.unique_id.as_str())
.collect();
assert!(
unique_ids.contains(&"model.pkg_a.customers"),
"pkg_a customers should be impacted, got unique_ids: {:?}",
unique_ids
);
assert!(
unique_ids.contains(&"model.pkg_b.customers"),
"pkg_b customers should be impacted, got unique_ids: {:?}",
unique_ids
);
assert_eq!(
unique_ids.len(),
2,
"both same-named models should appear separately, got: {:?}",
result.impacted_columns
);
}
fn make_diamond_manifest() -> Manifest {
let mut nodes = HashMap::new();
let mut raw_cols = HashMap::new();
for name in ["x", "y"] {
raw_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
nodes.insert(
"model.proj.raw_data".to_string(),
ManifestNode {
unique_id: "model.proj.raw_data".to_string(),
name: "raw_data".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn { nodes: vec![] },
config: ManifestConfig::default(),
description: None,
path: None,
columns: raw_cols,
compiled_code: Some("select x, y from source_table".to_string()),
database: None,
schema: None,
},
);
let mut shared_cols = HashMap::new();
for name in ["x", "y"] {
shared_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
nodes.insert(
"model.proj.shared".to_string(),
ManifestNode {
unique_id: "model.proj.shared".to_string(),
name: "shared".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec!["model.proj.raw_data".to_string()],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: shared_cols,
compiled_code: Some("select x, y from raw_data".to_string()),
database: None,
schema: None,
},
);
let mut left_cols = HashMap::new();
left_cols.insert(
"x".to_string(),
ManifestColumn {
name: "x".to_string(),
},
);
nodes.insert(
"model.proj.left_model".to_string(),
ManifestNode {
unique_id: "model.proj.left_model".to_string(),
name: "left_model".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec!["model.proj.shared".to_string()],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: left_cols,
compiled_code: Some("select x from shared".to_string()),
database: None,
schema: None,
},
);
let mut right_cols = HashMap::new();
right_cols.insert(
"y".to_string(),
ManifestColumn {
name: "y".to_string(),
},
);
nodes.insert(
"model.proj.right_model".to_string(),
ManifestNode {
unique_id: "model.proj.right_model".to_string(),
name: "right_model".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec!["model.proj.shared".to_string()],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: right_cols,
compiled_code: Some("select y from shared".to_string()),
database: None,
schema: None,
},
);
let mut out_cols = HashMap::new();
for name in ["lx", "ry"] {
out_cols.insert(
name.to_string(),
ManifestColumn {
name: name.to_string(),
},
);
}
nodes.insert(
"model.proj.diamond_out".to_string(),
ManifestNode {
unique_id: "model.proj.diamond_out".to_string(),
name: "diamond_out".to_string(),
resource_type: "model".to_string(),
depends_on: DependsOn {
nodes: vec![
"model.proj.left_model".to_string(),
"model.proj.right_model".to_string(),
],
},
config: ManifestConfig::default(),
description: None,
path: None,
columns: out_cols,
compiled_code: Some(
"select l.x as lx, r.y as ry from left_model l join right_model r on 1=1"
.to_string(),
),
database: None,
schema: None,
},
);
Manifest {
nodes,
sources: HashMap::new(),
exposures: HashMap::new(),
}
}
mod cache;
mod core;
mod impact;