use super::*;
#[test]
fn test_build_graph_versioned_model_creates_version_nodes() {
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().to_path_buf();
let models_dir = project_dir.join("models");
fs::create_dir_all(&models_dir).unwrap();
fs::write(models_dir.join("my_model_v1.sql"), "SELECT 1 as id").unwrap();
fs::write(models_dir.join("my_model_v2.sql"), "SELECT 2 as id").unwrap();
fs::write(
models_dir.join("schema.yml"),
r#"
version: 2
models:
- name: my_model
latest_version: 2
versions:
- v: 1
- v: 2
"#,
)
.unwrap();
let files = DiscoveredFiles {
model_sql_files: vec![
project_dir.join("models/my_model_v1.sql"),
project_dir.join("models/my_model_v2.sql"),
],
yaml_files: vec![project_dir.join("models/schema.yml")],
..Default::default()
};
let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
assert_eq!(graph.node_count(), 2);
let labels: Vec<&str> = graph
.node_indices()
.map(|i| graph[i].label.as_str())
.collect();
assert!(labels.contains(&"my_model.v1"));
assert!(labels.contains(&"my_model.v2"));
let ids: Vec<&str> = graph
.node_indices()
.map(|i| graph[i].unique_id.as_str())
.collect();
assert!(ids.contains(&"model.my_model.v1"));
assert!(ids.contains(&"model.my_model.v2"));
}
#[test]
fn test_build_graph_versioned_ref_without_version_kwarg_resolves_to_latest() {
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().to_path_buf();
let models_dir = project_dir.join("models");
fs::create_dir_all(&models_dir).unwrap();
fs::write(models_dir.join("my_model_v1.sql"), "SELECT 1").unwrap();
fs::write(models_dir.join("my_model_v2.sql"), "SELECT 2").unwrap();
fs::write(
models_dir.join("downstream.sql"),
"SELECT * FROM {{ ref('my_model') }}",
)
.unwrap();
fs::write(
models_dir.join("schema.yml"),
r#"
version: 2
models:
- name: my_model
latest_version: 2
versions:
- v: 1
- v: 2
"#,
)
.unwrap();
let files = DiscoveredFiles {
model_sql_files: vec![
project_dir.join("models/my_model_v1.sql"),
project_dir.join("models/my_model_v2.sql"),
project_dir.join("models/downstream.sql"),
],
yaml_files: vec![project_dir.join("models/schema.yml")],
..Default::default()
};
let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
assert_eq!(graph.node_count(), 3);
assert_eq!(graph.edge_count(), 1);
let downstream_idx = graph
.node_indices()
.find(|&i| graph[i].label == "downstream")
.unwrap();
let v2_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "model.my_model.v2")
.unwrap();
assert!(graph.contains_edge(v2_idx, downstream_idx));
}
#[test]
fn test_build_graph_versioned_ref_with_version_kwarg_resolves_to_correct_version() {
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().to_path_buf();
let models_dir = project_dir.join("models");
fs::create_dir_all(&models_dir).unwrap();
fs::write(models_dir.join("my_model_v1.sql"), "SELECT 1").unwrap();
fs::write(models_dir.join("my_model_v2.sql"), "SELECT 2").unwrap();
fs::write(
models_dir.join("downstream.sql"),
"SELECT * FROM {{ ref('my_model', version=1) }}",
)
.unwrap();
fs::write(
models_dir.join("schema.yml"),
r#"
version: 2
models:
- name: my_model
latest_version: 2
versions:
- v: 1
- v: 2
"#,
)
.unwrap();
let files = DiscoveredFiles {
model_sql_files: vec![
project_dir.join("models/my_model_v1.sql"),
project_dir.join("models/my_model_v2.sql"),
project_dir.join("models/downstream.sql"),
],
yaml_files: vec![project_dir.join("models/schema.yml")],
..Default::default()
};
let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
assert_eq!(graph.node_count(), 3);
assert_eq!(graph.edge_count(), 1);
let downstream_idx = graph
.node_indices()
.find(|&i| graph[i].label == "downstream")
.unwrap();
let v1_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "model.my_model.v1")
.unwrap();
assert!(graph.contains_edge(v1_idx, downstream_idx));
}
#[test]
fn test_build_graph_versioned_model_sql_edges_processed() {
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().to_path_buf();
let models_dir = project_dir.join("models");
fs::create_dir_all(&models_dir).unwrap();
fs::write(
models_dir.join("my_model_v1.sql"),
"SELECT * FROM {{ source('raw', 'events') }}",
)
.unwrap();
fs::write(
models_dir.join("my_model_v2.sql"),
"SELECT * FROM {{ ref('my_model', version=1) }}",
)
.unwrap();
fs::write(
models_dir.join("schema.yml"),
r#"
version: 2
sources:
- name: raw
tables:
- name: events
models:
- name: my_model
latest_version: 2
versions:
- v: 1
- v: 2
"#,
)
.unwrap();
let files = DiscoveredFiles {
model_sql_files: vec![
project_dir.join("models/my_model_v1.sql"),
project_dir.join("models/my_model_v2.sql"),
],
yaml_files: vec![project_dir.join("models/schema.yml")],
..Default::default()
};
let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
assert_eq!(graph.node_count(), 3);
assert_eq!(graph.edge_count(), 2);
let v1_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "model.my_model.v1")
.expect("v1 node must exist");
let v2_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "model.my_model.v2")
.expect("v2 node must exist");
let src_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "source.raw.events")
.expect("source node must exist");
assert!(
graph.contains_edge(src_idx, v1_idx),
"source→v1 edge missing"
);
assert!(graph.contains_edge(v1_idx, v2_idx), "v1→v2 edge missing");
}
#[test]
fn test_build_graph_versioned_phantom_for_missing_version() {
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().to_path_buf();
let models_dir = project_dir.join("models");
fs::create_dir_all(&models_dir).unwrap();
fs::write(models_dir.join("my_model_v2.sql"), "SELECT 2").unwrap();
fs::write(
models_dir.join("downstream.sql"),
"SELECT * FROM {{ ref('my_model', version=1) }}",
)
.unwrap();
fs::write(
models_dir.join("schema.yml"),
r#"
version: 2
models:
- name: my_model
latest_version: 2
versions:
- v: 2
"#,
)
.unwrap();
let files = DiscoveredFiles {
model_sql_files: vec![
project_dir.join("models/my_model_v2.sql"),
project_dir.join("models/downstream.sql"),
],
yaml_files: vec![project_dir.join("models/schema.yml")],
..Default::default()
};
let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
assert_eq!(graph.node_count(), 3);
let phantom = graph
.node_indices()
.find(|&i| graph[i].node_type == NodeType::Phantom)
.expect("phantom v1 node must exist");
assert_eq!(graph[phantom].unique_id, "model.my_model.v1");
let downstream_idx = graph
.node_indices()
.find(|&i| graph[i].label == "downstream")
.unwrap();
assert!(
graph.contains_edge(phantom, downstream_idx),
"phantom.v1 → downstream edge missing"
);
let v2_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "model.my_model.v2")
.unwrap();
assert!(
!graph.contains_edge(v2_idx, downstream_idx),
"v2 → downstream edge must not exist"
);
}
#[test]
fn test_build_graph_versioned_model_custom_defined_in() {
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().to_path_buf();
let models_dir = project_dir.join("models");
fs::create_dir_all(&models_dir).unwrap();
fs::write(models_dir.join("my_model_v1.sql"), "SELECT 1").unwrap();
fs::write(models_dir.join("custom_v2_file.sql"), "SELECT 2").unwrap();
fs::write(
models_dir.join("schema.yml"),
r#"
version: 2
models:
- name: my_model
latest_version: 2
versions:
- v: 1
- v: 2
defined_in: custom_v2_file
"#,
)
.unwrap();
let files = DiscoveredFiles {
model_sql_files: vec![
project_dir.join("models/my_model_v1.sql"),
project_dir.join("models/custom_v2_file.sql"),
],
yaml_files: vec![project_dir.join("models/schema.yml")],
..Default::default()
};
let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
assert_eq!(graph.node_count(), 2);
let ids: Vec<&str> = graph
.node_indices()
.map(|i| graph[i].unique_id.as_str())
.collect();
assert!(ids.contains(&"model.my_model.v1"));
assert!(ids.contains(&"model.my_model.v2"));
}
#[test]
fn test_build_graph_versioned_defined_in_base_name_edges_correct_version() {
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().to_path_buf();
let models_dir = project_dir.join("models");
fs::create_dir_all(&models_dir).unwrap();
fs::write(
models_dir.join("my_model.sql"),
"SELECT * FROM {{ source('raw', 'events') }}",
)
.unwrap();
fs::write(models_dir.join("my_model_v2.sql"), "SELECT 2").unwrap();
fs::write(
models_dir.join("schema.yml"),
r#"
version: 2
sources:
- name: raw
tables:
- name: events
models:
- name: my_model
latest_version: 2
versions:
- v: 1
defined_in: my_model
- v: 2
"#,
)
.unwrap();
let files = DiscoveredFiles {
model_sql_files: vec![
project_dir.join("models/my_model.sql"),
project_dir.join("models/my_model_v2.sql"),
],
yaml_files: vec![project_dir.join("models/schema.yml")],
..Default::default()
};
let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
assert_eq!(graph.node_count(), 3);
assert_eq!(graph.edge_count(), 1);
let v1_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "model.my_model.v1")
.expect("v1 node must exist");
let src_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "source.raw.events")
.expect("source node must exist");
let v2_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "model.my_model.v2")
.expect("v2 node must exist");
assert!(
graph.contains_edge(src_idx, v1_idx),
"source→v1 edge missing: edges in my_model.sql must attach to v1"
);
assert!(
!graph.contains_edge(src_idx, v2_idx),
"source→v2 edge must not exist"
);
}
#[test]
fn test_build_graph_unversioned_ref_resolves_to_versioned_phantom_when_sql_missing() {
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().to_path_buf();
let models_dir = project_dir.join("models");
fs::create_dir_all(&models_dir).unwrap();
fs::write(models_dir.join("my_model_v1.sql"), "SELECT 1").unwrap();
fs::write(
models_dir.join("downstream.sql"),
"SELECT * FROM {{ ref('my_model') }}",
)
.unwrap();
fs::write(
models_dir.join("schema.yml"),
r#"
version: 2
models:
- name: my_model
latest_version: 2
versions:
- v: 1
- v: 2
"#,
)
.unwrap();
let files = DiscoveredFiles {
model_sql_files: vec![
project_dir.join("models/my_model_v1.sql"),
project_dir.join("models/downstream.sql"),
],
yaml_files: vec![project_dir.join("models/schema.yml")],
..Default::default()
};
let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
assert_eq!(graph.node_count(), 3);
let phantom = graph
.node_indices()
.find(|&i| graph[i].node_type == NodeType::Phantom)
.expect("phantom v2 node must exist");
assert_eq!(graph[phantom].unique_id, "model.my_model.v2");
let downstream_idx = graph
.node_indices()
.find(|&i| graph[i].label == "downstream")
.unwrap();
assert!(
graph.contains_edge(phantom, downstream_idx),
"phantom.v2 → downstream edge missing"
);
}
#[test]
fn test_stem_to_versioned_no_duplicate_overwrite() {
let tmp = tempfile::tempdir().unwrap();
let project_dir = tmp.path().to_path_buf();
let models_dir = project_dir.join("models");
fs::create_dir_all(&models_dir).unwrap();
let yaml_a = r#"
models:
- name: orders
versions:
- v: 1
- v: 2
latest_version: 2
"#;
let yaml_b = r#"
models:
- name: orders
versions:
- v: 1
- v: 2
latest_version: 1
"#;
fs::write(models_dir.join("orders_v1.sql"), "SELECT 1").unwrap();
fs::write(models_dir.join("orders_v2.sql"), "SELECT 2").unwrap();
fs::write(
models_dir.join("downstream.sql"),
"SELECT * FROM {{ ref('orders') }}",
)
.unwrap();
fs::write(models_dir.join("schema_a.yml"), yaml_a).unwrap();
fs::write(models_dir.join("schema_b.yml"), yaml_b).unwrap();
let files = DiscoveredFiles {
model_sql_files: vec![
project_dir.join("models/orders_v1.sql"),
project_dir.join("models/orders_v2.sql"),
project_dir.join("models/downstream.sql"),
],
yaml_files: vec![
project_dir.join("models/schema_a.yml"),
project_dir.join("models/schema_b.yml"),
],
..Default::default()
};
let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
assert_eq!(
graph
.node_indices()
.filter(|&i| matches!(graph[i].node_type, NodeType::Model))
.count(),
3
);
let downstream_idx = graph
.node_indices()
.find(|&i| graph[i].label == "downstream")
.unwrap();
let v2_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "model.orders.v2")
.unwrap();
let v1_idx = graph
.node_indices()
.find(|&i| graph[i].unique_id == "model.orders.v1")
.unwrap();
assert!(
graph.contains_edge(v2_idx, downstream_idx),
"unversioned ref should resolve to v2 (first file's latest_version)"
);
assert!(
!graph.contains_edge(v1_idx, downstream_idx),
"ref must not resolve to v1 (second file's latest_version)"
);
}