use anyhow::Result;
use petgraph::stable_graph::NodeIndex;
use std::collections::HashMap;
use std::path::Path;
use crate::parser::discovery::DiscoveredFiles;
use crate::parser::sql::{extract_refs, extract_sources};
use crate::parser::yaml_schema::{parse_schema_file, ExposureDefinition};
use super::types::*;
pub fn build_graph(project_dir: &Path, files: &DiscoveredFiles) -> Result<LineageGraph> {
let mut graph = LineageGraph::new();
let mut node_map: HashMap<String, NodeIndex> = HashMap::new();
let mut model_name_paths: HashMap<String, std::path::PathBuf> = HashMap::new();
let mut model_descriptions: HashMap<String, String> = HashMap::new();
let mut exposures: Vec<ExposureDefinition> = Vec::new();
for yaml_path in &files.yaml_files {
let content = std::fs::read_to_string(yaml_path).map_err(|e| {
crate::error::DbtLineageError::FileReadError {
path: yaml_path.clone(),
source: e,
}
})?;
let schema = match parse_schema_file(&content) {
Ok(s) => s,
Err(_) => continue, };
for source_def in &schema.sources {
for table in &source_def.tables {
let unique_id = format!("source.{}.{}", source_def.name, table.name);
let label = format!("{}.{}", source_def.name, table.name);
let idx = graph.add_node(NodeData {
unique_id: unique_id.clone(),
label,
node_type: NodeType::Source,
file_path: Some(yaml_path.clone()),
description: table
.description
.clone()
.or_else(|| source_def.description.clone()),
});
node_map.insert(unique_id, idx);
}
}
for model_def in &schema.models {
if let Some(desc) = &model_def.description {
model_descriptions.insert(model_def.name.clone(), desc.clone());
}
}
exposures.extend(schema.exposures.into_iter());
}
for sql_path in &files.model_sql_files {
let model_name = sql_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
if let Some(existing_path) = model_name_paths.get(&model_name) {
eprintln!(
"Warning: duplicate model name '{}' in {} and {}",
model_name,
existing_path.display(),
sql_path.display()
);
}
model_name_paths.insert(model_name.clone(), sql_path.clone());
let unique_id = format!("model.{}", model_name);
let relative_path = sql_path
.strip_prefix(project_dir)
.unwrap_or(sql_path)
.to_path_buf();
let idx = graph.add_node(NodeData {
unique_id: unique_id.clone(),
label: model_name.clone(),
node_type: NodeType::Model,
file_path: Some(relative_path),
description: model_descriptions.get(&model_name).cloned(),
});
node_map.insert(unique_id, idx);
}
for seed_path in &files.seed_files {
let seed_name = seed_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
let unique_id = format!("seed.{}", seed_name);
let relative_path = seed_path
.strip_prefix(project_dir)
.unwrap_or(seed_path)
.to_path_buf();
let idx = graph.add_node(NodeData {
unique_id: unique_id.clone(),
label: seed_name,
node_type: NodeType::Seed,
file_path: Some(relative_path),
description: None,
});
node_map.insert(unique_id, idx);
}
for sql_path in &files.snapshot_sql_files {
let snap_name = sql_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
let unique_id = format!("snapshot.{}", snap_name);
let relative_path = sql_path
.strip_prefix(project_dir)
.unwrap_or(sql_path)
.to_path_buf();
let idx = graph.add_node(NodeData {
unique_id: unique_id.clone(),
label: snap_name,
node_type: NodeType::Snapshot,
file_path: Some(relative_path),
description: None,
});
node_map.insert(unique_id, idx);
}
let all_sql_files: Vec<(&std::path::PathBuf, &str)> = files
.model_sql_files
.iter()
.map(|p| (p, "model"))
.chain(files.snapshot_sql_files.iter().map(|p| (p, "snapshot")))
.chain(files.test_sql_files.iter().map(|p| (p, "test")))
.collect();
for (sql_path, file_type) in &all_sql_files {
let content = std::fs::read_to_string(sql_path).map_err(|e| {
crate::error::DbtLineageError::FileReadError {
path: (*sql_path).clone(),
source: e,
}
})?;
let node_name = sql_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
let node_unique_id = format!("{}.{}", file_type, node_name);
if *file_type == "test" {
let relative_path = sql_path
.strip_prefix(project_dir)
.unwrap_or(sql_path)
.to_path_buf();
let idx = graph.add_node(NodeData {
unique_id: node_unique_id.clone(),
label: node_name,
node_type: NodeType::Test,
file_path: Some(relative_path),
description: None,
});
node_map.insert(node_unique_id.clone(), idx);
}
let current_idx = match node_map.get(&node_unique_id) {
Some(idx) => *idx,
None => continue,
};
let refs = extract_refs(&content);
for ref_call in refs {
let dep_id = resolve_ref(&ref_call.name, &node_map);
let dep_idx = match node_map.get(&dep_id) {
Some(idx) => *idx,
None => {
eprintln!(
"Warning: unresolved ref '{}' in {}",
ref_call.name,
sql_path.display()
);
let phantom_id = format!("model.{}", ref_call.name);
let idx = graph.add_node(NodeData {
unique_id: phantom_id.clone(),
label: ref_call.name,
node_type: NodeType::Phantom,
file_path: None,
description: None,
});
node_map.insert(phantom_id, idx);
idx
}
};
graph.add_edge(
dep_idx,
current_idx,
EdgeData {
edge_type: EdgeType::Ref,
},
);
}
let sources = extract_sources(&content);
for source_call in sources {
let source_id = format!(
"source.{}.{}",
source_call.source_name, source_call.table_name
);
let source_idx = match node_map.get(&source_id) {
Some(idx) => *idx,
None => {
eprintln!(
"Warning: unresolved source '{}.{}' in {}",
source_call.source_name,
source_call.table_name,
sql_path.display()
);
let label = format!("{}.{}", source_call.source_name, source_call.table_name);
let idx = graph.add_node(NodeData {
unique_id: source_id.clone(),
label,
node_type: NodeType::Phantom,
file_path: None,
description: None,
});
node_map.insert(source_id, idx);
idx
}
};
graph.add_edge(
source_idx,
current_idx,
EdgeData {
edge_type: EdgeType::Source,
},
);
}
}
for exposure in &exposures {
let unique_id = format!("exposure.{}", exposure.name);
let idx = graph.add_node(NodeData {
unique_id: unique_id.clone(),
label: exposure.name.clone(),
node_type: NodeType::Exposure,
file_path: None,
description: exposure.description.clone(),
});
node_map.insert(unique_id, idx);
for dep in &exposure.depends_on {
if let Some(model_name) = parse_exposure_ref(dep) {
let dep_id = resolve_ref(&model_name, &node_map);
if let Some(&dep_idx) = node_map.get(&dep_id) {
graph.add_edge(
dep_idx,
idx,
EdgeData {
edge_type: EdgeType::Exposure,
},
);
}
}
}
}
Ok(graph)
}
fn resolve_ref(name: &str, node_map: &HashMap<String, NodeIndex>) -> String {
let model_id = format!("model.{}", name);
if node_map.contains_key(&model_id) {
return model_id;
}
let seed_id = format!("seed.{}", name);
if node_map.contains_key(&seed_id) {
return seed_id;
}
let snapshot_id = format!("snapshot.{}", name);
if node_map.contains_key(&snapshot_id) {
return snapshot_id;
}
model_id
}
fn parse_exposure_ref(dep: &str) -> Option<String> {
let dep = dep.trim();
if dep.starts_with("ref(") {
let inner = dep.trim_start_matches("ref(").trim_end_matches(')');
let name = inner.trim().trim_matches('\'').trim_matches('"');
Some(name.to_string())
} else if dep.starts_with("source(") {
None
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_resolve_ref_model() {
let mut node_map = HashMap::new();
let graph = &mut LineageGraph::new();
let idx = graph.add_node(NodeData {
unique_id: "model.orders".to_string(),
label: "orders".to_string(),
node_type: NodeType::Model,
file_path: None,
description: None,
});
node_map.insert("model.orders".to_string(), idx);
assert_eq!(resolve_ref("orders", &node_map), "model.orders");
}
#[test]
fn test_resolve_ref_seed() {
let mut node_map = HashMap::new();
let graph = &mut LineageGraph::new();
let idx = graph.add_node(NodeData {
unique_id: "seed.countries".to_string(),
label: "countries".to_string(),
node_type: NodeType::Seed,
file_path: None,
description: None,
});
node_map.insert("seed.countries".to_string(), idx);
assert_eq!(resolve_ref("countries", &node_map), "seed.countries");
}
#[test]
fn test_parse_exposure_ref() {
assert_eq!(
parse_exposure_ref("ref('orders')"),
Some("orders".to_string())
);
assert_eq!(
parse_exposure_ref("ref(\"orders\")"),
Some("orders".to_string())
);
assert_eq!(parse_exposure_ref("source('raw', 'orders')"), None);
}
}