1use anyhow::Result;
2use petgraph::stable_graph::NodeIndex;
3use rayon::prelude::*;
4use std::collections::HashMap;
5use std::path::Path;
6
7use std::path::PathBuf;
8
9use crate::graph::types::{ExposureInfo, OwnerInfo};
10use crate::parser::cache;
11use crate::parser::columns::extract_select_columns;
12use crate::parser::discovery::DiscoveredFiles;
13use crate::parser::jinja::JinjaExtraction;
14use crate::parser::sql::{
15 RefCall, SourceCall, extract_all_with_vars, extract_refs_and_sources_with_vars, extract_sources,
16};
17use crate::parser::yaml_schema::{
18 ExposureDefinition, MetricDefinition, ModelDefinition, SavedQueryDefinition, SchemaFile,
19 SemanticModelDefinition, SnapshotDefinition, parse_schema_file,
20};
21
22fn load_macro_prefix(files: &DiscoveredFiles) -> String {
25 let sources: Vec<String> = files
26 .macro_sql_files
27 .iter()
28 .filter_map(|path| match std::fs::read_to_string(path) {
29 Ok(content) => Some(content),
30 Err(e) => {
31 crate::warn!("could not read macro file {}: {}", path.display(), e);
32 None
33 }
34 })
35 .collect();
36 crate::parser::jinja::build_macro_prefix(&sources)
37}
38
39use super::types::*;
40
41struct GraphBuilder {
43 graph: LineageGraph,
44 node_map: HashMap<String, NodeIndex>,
45}
46
47impl GraphBuilder {
48 fn new() -> Self {
49 Self {
50 graph: LineageGraph::new(),
51 node_map: HashMap::new(),
52 }
53 }
54
55 fn add_node(&mut self, data: NodeData) -> NodeIndex {
57 let idx = self.graph.add_node(data);
58 let unique_id = self.graph[idx].unique_id.clone();
59 self.node_map.insert(unique_id, idx);
60 idx
61 }
62
63 fn add_alias(&mut self, from: String, to: &str) {
70 let idx = if let Some(&existing) = self.node_map.get(to) {
71 existing
72 } else {
73 let label = to.strip_prefix("model.").unwrap_or(to).to_string();
74 self.add_node(NodeData {
75 unique_id: to.to_string(),
76 label,
77 node_type: NodeType::Phantom,
78 file_path: None,
79 description: None,
80 materialization: None,
81 tags: vec![],
82 columns: vec![],
83 exposure: None,
84 aliases: vec![],
85 })
86 };
87 if let std::collections::hash_map::Entry::Vacant(e) = self.node_map.entry(from.clone()) {
88 e.insert(idx);
89 self.graph[idx].aliases.push(from);
90 }
91 }
92
93 fn get_or_create_phantom_ref(
98 &mut self,
99 ref_name: &str,
100 version: Option<String>,
101 sql_path: &Path,
102 ) -> NodeIndex {
103 let dep_id = if let Some(ref v) = version {
104 format!("model.{}.v{}", ref_name, v)
105 } else {
106 resolve_ref(ref_name, &self.node_map)
107 };
108 if let Some(&idx) = self.node_map.get(&dep_id) {
109 return idx;
110 }
111 let display_name = match version.as_deref() {
112 Some(v) => format!("{}.v{}", ref_name, v),
113 None => ref_name.to_string(),
114 };
115 crate::warn!(
116 "unresolved ref '{}' in {}",
117 display_name,
118 sql_path.display()
119 );
120 let phantom_id = match version.as_deref() {
121 Some(v) => format!("model.{}.v{}", ref_name, v),
122 None => format!("model.{}", ref_name),
123 };
124 self.add_node(NodeData {
125 unique_id: phantom_id,
126 label: display_name,
127 node_type: NodeType::Phantom,
128 file_path: None,
129 description: None,
130 materialization: None,
131 tags: vec![],
132 columns: vec![],
133 exposure: None,
134 aliases: vec![],
135 })
136 }
137
138 fn get_or_create_phantom_source(
140 &mut self,
141 source_name: &str,
142 table_name: &str,
143 sql_path: &Path,
144 ) -> NodeIndex {
145 let source_id = format!("source.{}.{}", source_name, table_name);
146 if let Some(&idx) = self.node_map.get(&source_id) {
147 return idx;
148 }
149 crate::warn!(
150 "unresolved source '{}.{}' in {}",
151 source_name,
152 table_name,
153 sql_path.display()
154 );
155 let label = format!("{}.{}", source_name, table_name);
156 self.add_node(NodeData {
157 unique_id: source_id,
158 label,
159 node_type: NodeType::Phantom,
160 file_path: None,
161 description: None,
162 materialization: None,
163 tags: vec![],
164 columns: vec![],
165 exposure: None,
166 aliases: vec![],
167 })
168 }
169}
170
171fn read_file(path: &Path) -> Result<String> {
173 std::fs::read_to_string(path).map_err(|e| {
174 crate::error::DbtLineageError::FileReadError {
175 path: path.to_path_buf(),
176 source: e,
177 }
178 .into()
179 })
180}
181
182fn file_stem_str(path: &Path) -> String {
184 path.file_stem()
185 .and_then(|s| s.to_str())
186 .unwrap_or("unknown")
187 .to_string()
188}
189
190fn add_source_nodes(
192 gb: &mut GraphBuilder,
193 schema: &crate::parser::yaml_schema::SchemaFile,
194 yaml_path: &Path,
195 project_dir: &Path,
196) {
197 let relative_path = yaml_path
198 .strip_prefix(project_dir)
199 .unwrap_or(yaml_path)
200 .to_path_buf();
201 for source_def in &schema.sources {
202 for table in &source_def.tables {
203 let unique_id = format!("source.{}.{}", source_def.name, table.name);
204 let label = format!("{}.{}", source_def.name, table.name);
205 gb.add_node(NodeData {
206 unique_id,
207 label,
208 node_type: NodeType::Source,
209 file_path: Some(relative_path.clone()),
210 description: table
211 .description
212 .clone()
213 .or_else(|| source_def.description.clone()),
214 materialization: None,
215 tags: vec![],
216 columns: vec![],
217 exposure: None,
218 aliases: vec![],
219 });
220 }
221 }
222}
223
224#[derive(Clone, Default)]
226struct YamlModelMeta {
227 description: Option<String>,
228 materialization: Option<String>,
229 tags: Vec<String>,
230 columns: Vec<String>,
231}
232
233struct YamlParseResult {
235 model_meta: HashMap<String, YamlModelMeta>,
236 exposures: Vec<ExposureDefinition>,
237 schemas: Vec<(SchemaFile, PathBuf)>,
239 stem_to_versioned: HashMap<String, (String, String)>,
241 version_aliases: HashMap<String, String>,
243 snapshot_defs: Vec<(SnapshotDefinition, PathBuf)>,
245 semantic_models: Vec<(SemanticModelDefinition, PathBuf)>,
247 metrics: Vec<(MetricDefinition, PathBuf)>,
248 saved_queries: Vec<(SavedQueryDefinition, PathBuf)>,
249}
250
251#[allow(clippy::type_complexity)]
254fn build_version_maps(
255 model_def: &ModelDefinition,
256) -> (Vec<(String, (String, String))>, Option<(String, String)>) {
257 if model_def.versions.is_empty() {
258 return (vec![], None);
259 }
260 let name = &model_def.name;
261 let mut stem_entries: Vec<(String, (String, String))> = Vec::new();
262 for vspec in &model_def.versions {
263 let v_str = vspec.v_str();
264 let stem = vspec.sql_stem(name);
265 let unique_id = format!("model.{}.v{}", name, v_str);
266 stem_entries.push((stem, (unique_id, name.clone())));
267 }
268 let alias = model_def.resolved_latest_version_str().map(|lv_str| {
269 let unversioned_id = format!("model.{}", name);
270 let latest_versioned_id = format!("model.{}.v{}", name, lv_str);
271 (unversioned_id, latest_versioned_id)
272 });
273 (stem_entries, alias)
274}
275
276fn process_yaml_files(
279 gb: &mut GraphBuilder,
280 files: &DiscoveredFiles,
281 project_dir: &Path,
282) -> Result<YamlParseResult> {
283 let mut model_meta: HashMap<String, YamlModelMeta> = HashMap::new();
284 let mut exposures: Vec<ExposureDefinition> = Vec::new();
285 let mut schemas: Vec<(SchemaFile, PathBuf)> = Vec::new();
286 let mut stem_to_versioned: HashMap<String, (String, String)> = HashMap::new();
287 let mut version_aliases: HashMap<String, String> = HashMap::new();
288 let mut snapshot_defs: Vec<(SnapshotDefinition, PathBuf)> = Vec::new();
289 let mut semantic_models: Vec<(SemanticModelDefinition, PathBuf)> = Vec::new();
290 let mut metrics: Vec<(MetricDefinition, PathBuf)> = Vec::new();
291 let mut saved_queries: Vec<(SavedQueryDefinition, PathBuf)> = Vec::new();
292
293 let mut sorted_yaml_files = files.yaml_files.clone();
296 sorted_yaml_files.sort();
297
298 for yaml_path in &sorted_yaml_files {
299 let content = read_file(yaml_path)?;
300 let schema = match parse_schema_file(&content, Some(yaml_path.as_path())) {
301 Ok(s) => s,
302 Err(_) => continue,
303 };
304
305 add_source_nodes(gb, &schema, yaml_path, project_dir);
306
307 for model_def in &schema.models {
308 let mut meta = YamlModelMeta {
309 description: model_def.description.clone(),
310 columns: model_def.columns.iter().map(|c| c.name.clone()).collect(),
311 ..Default::default()
312 };
313 let mut tags = model_def.tags.clone();
315 if let Some(cfg) = &model_def.config {
316 meta.materialization = cfg.materialized.clone();
317 tags.extend(cfg.tags.clone());
318 }
319 tags.sort();
320 tags.dedup();
321 meta.tags = tags;
322 model_meta.insert(model_def.name.clone(), meta);
323
324 let (stem_entries, alias) = build_version_maps(model_def);
326 for (stem, entry) in stem_entries {
327 stem_to_versioned.entry(stem).or_insert(entry);
328 }
329 if let Some((unversioned_id, latest_versioned_id)) = alias {
330 version_aliases
331 .entry(unversioned_id)
332 .or_insert(latest_versioned_id);
333 }
334 }
335
336 exposures.extend(schema.exposures.iter().cloned());
337
338 let relative_path = yaml_path
339 .strip_prefix(project_dir)
340 .unwrap_or(yaml_path)
341 .to_path_buf();
342
343 semantic_models.extend(
344 schema
345 .semantic_models
346 .iter()
347 .cloned()
348 .map(|sm| (sm, relative_path.clone())),
349 );
350 metrics.extend(
351 schema
352 .metrics
353 .iter()
354 .cloned()
355 .map(|m| (m, relative_path.clone())),
356 );
357 saved_queries.extend(
358 schema
359 .saved_queries
360 .iter()
361 .cloned()
362 .map(|sq| (sq, relative_path.clone())),
363 );
364
365 for snap_def in &schema.snapshots {
366 snapshot_defs.push((snap_def.clone(), relative_path.clone()));
367 }
368 schemas.push((schema, relative_path));
369 }
370
371 Ok(YamlParseResult {
372 model_meta,
373 exposures,
374 schemas,
375 stem_to_versioned,
376 version_aliases,
377 snapshot_defs,
378 semantic_models,
379 metrics,
380 saved_queries,
381 })
382}
383
384type ExtractionCache = HashMap<PathBuf, (Vec<RefCall>, Vec<SourceCall>)>;
387
388struct ModelExtraction {
390 sql_path: PathBuf,
391 model_name: String,
392 extraction: Option<JinjaExtraction>,
393 columns: Vec<String>,
394 from_cache: bool,
396}
397
398#[allow(clippy::too_many_arguments)]
407fn process_model_files(
408 gb: &mut GraphBuilder,
409 files: &DiscoveredFiles,
410 project_dir: &Path,
411 model_meta: &HashMap<String, YamlModelMeta>,
412 macro_prefix: &str,
413 disk_cache: &mut cache::ExtractionCache,
414 vars: &HashMap<String, serde_json::Value>,
415 stem_to_versioned: &HashMap<String, (String, String)>,
416) -> ExtractionCache {
417 let cache_ref = &*disk_cache;
420 let extractions: Vec<ModelExtraction> = files
421 .model_sql_files
422 .par_iter()
423 .map(|sql_path| {
424 let model_name = file_stem_str(sql_path);
425
426 if let Some(cached) = cache_ref.get(sql_path, project_dir) {
428 let sql_content = std::fs::read_to_string(sql_path).ok();
429 let columns = sql_content
430 .as_ref()
431 .map(|content| extract_select_columns(content))
432 .unwrap_or_default();
433 return ModelExtraction {
434 sql_path: sql_path.clone(),
435 model_name,
436 extraction: Some(cached.clone()),
437 columns,
438 from_cache: true,
439 };
440 }
441
442 let sql_content = std::fs::read_to_string(sql_path).ok();
443
444 let extraction = sql_content
445 .as_ref()
446 .map(|content| extract_all_with_vars(content, macro_prefix, vars));
447
448 let columns = sql_content
449 .as_ref()
450 .map(|content| extract_select_columns(content))
451 .unwrap_or_default();
452
453 ModelExtraction {
454 sql_path: sql_path.clone(),
455 model_name,
456 extraction,
457 columns,
458 from_cache: false,
459 }
460 })
461 .collect();
462
463 let mut model_name_paths: HashMap<String, std::path::PathBuf> = HashMap::new();
465 let mut mem_cache: ExtractionCache = HashMap::new();
466
467 for me in extractions {
468 if let Some(existing_path) = model_name_paths.get(&me.model_name) {
469 crate::warn!(
470 "duplicate model name '{}' in {} and {}",
471 me.model_name,
472 existing_path.display(),
473 me.sql_path.display()
474 );
475 }
476 model_name_paths.insert(me.model_name.clone(), me.sql_path.clone());
477
478 let from_cache = me.from_cache;
479 let (sql_config, cached_refs_sources) = match me.extraction {
480 Some(ext) => {
481 if !from_cache {
483 disk_cache.insert(&me.sql_path, project_dir, &ext);
484 }
485 (ext.config, Some((ext.refs, ext.sources)))
486 }
487 None => (Default::default(), None),
488 };
489
490 if let Some(rs) = cached_refs_sources {
491 mem_cache.insert(me.sql_path.clone(), rs);
492 }
493
494 let (unique_id, label, meta_key) =
496 if let Some((versioned_id, base_name)) = stem_to_versioned.get(&me.model_name) {
497 (
498 versioned_id.clone(),
499 versioned_id
501 .strip_prefix("model.")
502 .unwrap_or(versioned_id)
503 .to_string(),
504 base_name.as_str(),
505 )
506 } else {
507 let uid = format!("model.{}", me.model_name);
508 (uid, me.model_name.clone(), me.model_name.as_str())
509 };
510
511 let yaml_meta = model_meta.get(meta_key);
512
513 let materialization = sql_config
514 .materialized
515 .or_else(|| yaml_meta.and_then(|m| m.materialization.clone()));
516
517 let mut tags = sql_config.tags;
518 if let Some(meta) = yaml_meta {
519 tags.extend(meta.tags.clone());
520 }
521 tags.sort();
522 tags.dedup();
523
524 let relative_path = me
525 .sql_path
526 .strip_prefix(project_dir)
527 .unwrap_or(&me.sql_path)
528 .to_path_buf();
529
530 let columns = match yaml_meta {
532 Some(m) if !m.columns.is_empty() => m.columns.clone(),
533 _ => me.columns,
534 };
535
536 gb.add_node(NodeData {
537 unique_id,
538 label,
539 node_type: NodeType::Model,
540 file_path: Some(relative_path),
541 description: yaml_meta.and_then(|m| m.description.clone()),
542 materialization,
543 tags,
544 columns,
545 exposure: None,
546 aliases: vec![],
547 });
548 }
549
550 mem_cache
551}
552
553fn process_simple_nodes(
555 gb: &mut GraphBuilder,
556 paths: &[std::path::PathBuf],
557 project_dir: &Path,
558 prefix: &str,
559 node_type: NodeType,
560) {
561 for path in paths {
562 let name = file_stem_str(path);
563 let unique_id = format!("{}.{}", prefix, name);
564 let relative_path = path.strip_prefix(project_dir).unwrap_or(path).to_path_buf();
565
566 gb.add_node(NodeData {
567 unique_id,
568 label: name,
569 node_type,
570 file_path: Some(relative_path),
571 description: None,
572 materialization: None,
573 tags: vec![],
574 columns: vec![],
575 exposure: None,
576 aliases: vec![],
577 });
578 }
579}
580
581fn process_sql_edges(
588 gb: &mut GraphBuilder,
589 files: &DiscoveredFiles,
590 project_dir: &Path,
591 macro_prefix: &str,
592 extraction_cache: &ExtractionCache,
593 vars: &HashMap<String, serde_json::Value>,
594 stem_to_versioned: &HashMap<String, (String, String)>,
595) -> Result<()> {
596 let all_sql_files: Vec<(&std::path::PathBuf, &str)> = files
597 .model_sql_files
598 .iter()
599 .map(|p| (p, "model"))
600 .chain(files.snapshot_sql_files.iter().map(|p| (p, "snapshot")))
601 .chain(files.test_sql_files.iter().map(|p| (p, "test")))
602 .collect();
603
604 for (sql_path, file_type) in &all_sql_files {
605 let node_name = file_stem_str(sql_path);
606 let node_unique_id = format!("{}.{}", file_type, node_name);
607
608 if *file_type == "test" {
610 let relative_path = sql_path
611 .strip_prefix(project_dir)
612 .unwrap_or(sql_path)
613 .to_path_buf();
614 gb.add_node(NodeData {
615 unique_id: node_unique_id.clone(),
616 label: node_name.clone(),
617 node_type: NodeType::Test,
618 file_path: Some(relative_path),
619 description: None,
620 materialization: None,
621 tags: vec![],
622 columns: vec![],
623 exposure: None,
624 aliases: vec![],
625 });
626 }
627
628 let current_idx = if *file_type == "model" {
632 let lookup_id = stem_to_versioned
633 .get(&node_name)
634 .map(|(versioned_id, _)| versioned_id.as_str())
635 .unwrap_or(&node_unique_id);
636 match gb.node_map.get(lookup_id) {
637 Some(&idx) => idx,
638 None => continue,
639 }
640 } else {
641 match gb.node_map.get(&node_unique_id) {
642 Some(&idx) => idx,
643 None => continue,
644 }
645 };
646
647 let owned;
649 let (refs, sources) = if let Some(cached) = extraction_cache.get(*sql_path) {
650 (&cached.0, &cached.1)
651 } else {
652 let content = read_file(sql_path)?;
653 owned = extract_refs_and_sources_with_vars(&content, macro_prefix, vars);
654 (&owned.0, &owned.1)
655 };
656
657 let is_test = *file_type == "test";
660
661 for ref_call in refs {
662 let dep_idx =
663 gb.get_or_create_phantom_ref(&ref_call.name, ref_call.version.clone(), sql_path);
664 let edge_type = if is_test {
665 EdgeType::Test
666 } else {
667 EdgeType::Ref
668 };
669 gb.graph
670 .add_edge(dep_idx, current_idx, EdgeData::direct(edge_type));
671 }
672
673 for source_call in sources {
674 let source_idx = gb.get_or_create_phantom_source(
675 &source_call.source_name,
676 &source_call.table_name,
677 sql_path,
678 );
679 let edge_type = if is_test {
680 EdgeType::Test
681 } else {
682 EdgeType::Source
683 };
684 gb.graph
685 .add_edge(source_idx, current_idx, EdgeData::direct(edge_type));
686 }
687 }
688
689 Ok(())
690}
691
692fn process_exposures(gb: &mut GraphBuilder, exposures: &[ExposureDefinition]) {
694 for exposure in exposures {
695 let unique_id = format!("exposure.{}", exposure.name);
696 let idx = gb.add_node(NodeData {
697 unique_id,
698 label: exposure.name.clone(),
699 node_type: NodeType::Exposure,
700 file_path: None,
701 description: exposure.description.clone(),
702 materialization: None,
703 tags: vec![],
704 columns: vec![],
705 exposure: Some(ExposureInfo {
706 label: exposure.label.clone(),
707 exposure_type: exposure.exposure_type.clone(),
708 url: exposure.url.clone(),
709 maturity: exposure.maturity.clone(),
710 owner: exposure.owner.as_ref().map(|o| OwnerInfo {
711 name: o.name.as_ref().filter(|s| !s.trim().is_empty()).cloned(),
712 email: o.email.as_ref().filter(|s| !s.trim().is_empty()).cloned(),
713 }),
714 }),
715 aliases: vec![],
716 });
717
718 for dep in &exposure.depends_on {
719 if let Some((model_name, version)) = parse_exposure_ref(dep) {
720 let dep_id = if let Some(ref v) = version {
721 format!("model.{}.v{}", model_name, v)
722 } else {
723 resolve_ref(&model_name, &gb.node_map)
724 };
725 if let Some(&dep_idx) = gb.node_map.get(&dep_id) {
726 gb.graph
727 .add_edge(dep_idx, idx, EdgeData::direct(EdgeType::Exposure));
728 }
729 }
730 }
731 }
732}
733
734fn dedup_unique_id(
739 candidate: &str,
740 node_map: &HashMap<String, NodeIndex>,
741) -> (String, Option<String>) {
742 if !node_map.contains_key(candidate) {
743 return (candidate.to_string(), None);
744 }
745 let mut n = 2u32;
746 loop {
747 let suffix = format!("_{}", n);
748 let suffixed = format!("{}{}", candidate, suffix);
749 if !node_map.contains_key(&suffixed) {
750 return (suffixed, Some(suffix));
751 }
752 n += 1;
753 }
754}
755
756fn add_generic_test_node(
758 gb: &mut GraphBuilder,
759 parent_idx: NodeIndex,
760 unique_id: String,
761 label: String,
762 file_path: Option<PathBuf>,
763) {
764 let idx = gb.add_node(NodeData {
765 unique_id,
766 label,
767 node_type: NodeType::Test,
768 file_path,
769 description: None,
770 materialization: None,
771 tags: vec![],
772 columns: vec![],
773 exposure: None,
774 aliases: vec![],
775 });
776 gb.graph
777 .add_edge(parent_idx, idx, EdgeData::direct(EdgeType::Test));
778}
779
780fn process_generic_tests(gb: &mut GraphBuilder, schemas: &[(SchemaFile, PathBuf)]) {
783 for (schema, yaml_path) in schemas {
784 let file_path = Some(yaml_path.clone());
785
786 for model_def in &schema.models {
790 let parent_id = format!("model.{}", model_def.name);
791 let parent_idx = match gb.node_map.get(&parent_id) {
792 Some(&idx) => idx,
793 None => continue,
794 };
795
796 for test_def in &model_def.tests {
798 let test_name = match test_def.test_name() {
799 Some(name) => name,
800 None => continue,
801 };
802 let candidate = format!("test.{}.{}", test_name, model_def.name);
803 let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
804 let mut label = format!("{}_{}", test_name, model_def.name);
805 if let Some(s) = suffix {
806 label.push_str(&s);
807 }
808 add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
809 }
810
811 for col in &model_def.columns {
813 for test_def in &col.tests {
814 let test_name = match test_def.test_name() {
815 Some(name) => name,
816 None => continue,
817 };
818 let candidate = format!("test.{}.{}.{}", test_name, model_def.name, col.name);
819 let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
820 let mut label = format!("{}_{}_{}", test_name, model_def.name, col.name);
821 if let Some(s) = suffix {
822 label.push_str(&s);
823 }
824 add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
825 }
826 }
827 }
828
829 for source_def in &schema.sources {
831 for table in &source_def.tables {
832 let parent_id = format!("source.{}.{}", source_def.name, table.name);
833 let parent_idx = match gb.node_map.get(&parent_id) {
834 Some(&idx) => idx,
835 None => continue,
836 };
837 for col in &table.columns {
838 for test_def in &col.tests {
839 let test_name = match test_def.test_name() {
840 Some(name) => name,
841 None => continue,
842 };
843 let candidate = format!(
844 "test.{}.{}.{}.{}",
845 test_name, source_def.name, table.name, col.name
846 );
847 let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
848 let mut label = format!(
849 "{}_{}_{}_{}",
850 test_name, source_def.name, table.name, col.name
851 );
852 if let Some(s) = suffix {
853 label.push_str(&s);
854 }
855 add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
856 }
857 }
858 }
859 }
860 }
861}
862
863fn process_yaml_snapshot_nodes(
871 gb: &mut GraphBuilder,
872 snapshot_defs: &[(SnapshotDefinition, PathBuf)],
873) {
874 let mut yaml_registered = std::collections::HashSet::<String>::new();
879 for (snap_def, yaml_path) in snapshot_defs {
880 let unique_id = format!("snapshot.{}", snap_def.name);
881 if gb.node_map.contains_key(&unique_id) {
882 continue;
883 }
884 gb.add_node(NodeData {
885 unique_id: unique_id.clone(),
886 label: snap_def.name.clone(),
887 node_type: NodeType::Snapshot,
888 file_path: Some(yaml_path.clone()),
889 description: snap_def.description.clone(),
890 materialization: None,
891 tags: vec![],
892 columns: vec![],
893 exposure: None,
894 aliases: vec![],
895 });
896 yaml_registered.insert(unique_id);
897 }
898
899 for (snap_def, yaml_path) in snapshot_defs {
901 let unique_id = format!("snapshot.{}", snap_def.name);
902 if !yaml_registered.remove(&unique_id) {
903 continue;
904 }
905 let Some(&snap_idx) = gb.node_map.get(&unique_id) else {
906 continue;
907 };
908 if let Some(relation) = &snap_def.relation {
909 if let Some((source_name, table_name)) = parse_relation_source(relation) {
910 let dep_idx =
911 gb.get_or_create_phantom_source(&source_name, &table_name, yaml_path.as_path());
912 gb.graph
913 .add_edge(dep_idx, snap_idx, EdgeData::direct(EdgeType::Source));
914 } else if let Some((model_name, version)) = parse_exposure_ref(relation) {
915 let dep_idx =
916 gb.get_or_create_phantom_ref(&model_name, version, yaml_path.as_path());
917 gb.graph
918 .add_edge(dep_idx, snap_idx, EdgeData::direct(EdgeType::Ref));
919 }
920 }
921 }
922}
923
924fn process_semantic_layer(
938 gb: &mut GraphBuilder,
939 semantic_models: &[(SemanticModelDefinition, PathBuf)],
940 metrics: &[(MetricDefinition, PathBuf)],
941 saved_queries: &[(SavedQueryDefinition, PathBuf)],
942) {
943 for (sm, yaml_path) in semantic_models {
945 let unique_id = format!("semantic_model.{}", sm.name);
946 if gb.node_map.contains_key(&unique_id) {
947 continue;
948 }
949 gb.add_node(NodeData {
950 unique_id,
951 label: sm.label.as_deref().unwrap_or(&sm.name).to_string(),
952 node_type: NodeType::SemanticModel,
953 file_path: Some(yaml_path.clone()),
954 description: sm.description.clone(),
955 materialization: None,
956 tags: vec![],
957 columns: vec![],
958 exposure: None,
959 aliases: vec![],
960 });
961 }
962
963 let mut measure_to_sem: HashMap<String, String> = HashMap::new();
965 for (sm, yaml_path) in semantic_models {
966 let sem_id = format!("semantic_model.{}", sm.name);
967 let Some(&sem_idx) = gb.node_map.get(&sem_id) else {
968 continue;
969 };
970 for measure in &sm.measures {
971 if let Some(existing) = measure_to_sem.get(&measure.name) {
972 if existing != &sm.name {
973 crate::warn!(
974 "measure '{}' defined in both semantic_model '{}' and '{}'; \
975 linking metrics to '{}'",
976 measure.name,
977 existing,
978 sm.name,
979 existing
980 );
981 }
982 } else {
983 measure_to_sem.insert(measure.name.clone(), sm.name.clone());
984 }
985 }
986 if let Some(model_ref) = &sm.model
988 && let Some((model_name, version)) = parse_exposure_ref(model_ref)
989 {
990 let dep_idx = gb.get_or_create_phantom_ref(&model_name, version, yaml_path.as_path());
991 gb.graph
992 .add_edge(dep_idx, sem_idx, EdgeData::direct(EdgeType::Ref));
993 }
994 }
995
996 for (metric, yaml_path) in metrics {
998 let unique_id = format!("metric.{}", metric.name);
999 if gb.node_map.contains_key(&unique_id) {
1000 continue;
1001 }
1002 gb.add_node(NodeData {
1003 unique_id,
1004 label: metric.label.as_deref().unwrap_or(&metric.name).to_string(),
1005 node_type: NodeType::Metric,
1006 file_path: Some(yaml_path.clone()),
1007 description: metric.description.clone(),
1008 materialization: None,
1009 tags: vec![],
1010 columns: vec![],
1011 exposure: None,
1012 aliases: vec![],
1013 });
1014 }
1015
1016 for (metric, yaml_path) in metrics {
1018 let metric_id = format!("metric.{}", metric.name);
1019 let Some(&metric_idx) = gb.node_map.get(&metric_id) else {
1020 continue;
1021 };
1022 let mut seen_sem_indices = std::collections::HashSet::new();
1027 for measure_name in metric.measure_refs() {
1028 let Some(sem_name) = measure_to_sem.get(measure_name) else {
1029 continue;
1030 };
1031 let sem_id = format!("semantic_model.{}", sem_name);
1032 let Some(&sem_idx) = gb.node_map.get(&sem_id) else {
1033 continue;
1034 };
1035 if seen_sem_indices.insert(sem_idx) {
1036 gb.graph
1037 .add_edge(sem_idx, metric_idx, EdgeData::direct(EdgeType::Ref));
1038 }
1039 }
1040 let mut seen_metric_refs = std::collections::HashSet::new();
1043 for dep_metric_name in metric.metric_refs() {
1044 if !seen_metric_refs.insert(dep_metric_name) {
1045 continue;
1046 }
1047 let dep_id = format!("metric.{}", dep_metric_name);
1048 let dep_idx = if let Some(&idx) = gb.node_map.get(&dep_id) {
1049 idx
1050 } else {
1051 crate::warn!(
1052 "unresolved metric ref '{}' from metric '{}'",
1053 dep_metric_name,
1054 metric.name
1055 );
1056 gb.add_node(NodeData {
1057 unique_id: dep_id,
1058 label: dep_metric_name.to_string(),
1059 node_type: NodeType::Phantom,
1060 file_path: Some(yaml_path.clone()),
1061 description: None,
1062 materialization: None,
1063 tags: vec![],
1064 columns: vec![],
1065 exposure: None,
1066 aliases: vec![],
1067 })
1068 };
1069 gb.graph
1070 .add_edge(dep_idx, metric_idx, EdgeData::direct(EdgeType::Ref));
1071 }
1072 }
1073
1074 for (sq, yaml_path) in saved_queries {
1076 let sq_id = format!("saved_query.{}", sq.name);
1077 if gb.node_map.contains_key(&sq_id) {
1078 continue;
1079 }
1080 let sq_idx = gb.add_node(NodeData {
1081 unique_id: sq_id.clone(),
1082 label: sq.label.as_deref().unwrap_or(&sq.name).to_string(),
1083 node_type: NodeType::SavedQuery,
1084 file_path: Some(yaml_path.clone()),
1085 description: sq.description.clone(),
1086 materialization: None,
1087 tags: vec![],
1088 columns: vec![],
1089 exposure: None,
1090 aliases: vec![],
1091 });
1092 if let Some(qp) = &sq.query_params {
1093 for metric_name in &qp.metrics {
1094 let metric_dep_id = format!("metric.{}", metric_name);
1095 let dep_idx = if let Some(&idx) = gb.node_map.get(&metric_dep_id) {
1096 idx
1097 } else {
1098 crate::warn!(
1099 "unresolved metric ref '{}' in saved_query '{}'",
1100 metric_name,
1101 sq.name
1102 );
1103 gb.add_node(NodeData {
1104 unique_id: metric_dep_id,
1105 label: metric_name.clone(),
1106 node_type: NodeType::Phantom,
1107 file_path: Some(yaml_path.clone()),
1108 description: None,
1109 materialization: None,
1110 tags: vec![],
1111 columns: vec![],
1112 exposure: None,
1113 aliases: vec![],
1114 })
1115 };
1116 gb.graph
1117 .add_edge(dep_idx, sq_idx, EdgeData::direct(EdgeType::Ref));
1118 }
1119 }
1120 }
1121}
1122
1123pub fn build_graph(
1130 project_dir: &Path,
1131 files: &DiscoveredFiles,
1132 cache_dir: Option<&Path>,
1133 no_cache: bool,
1134 refresh_cache: bool,
1135 vars: &HashMap<String, serde_json::Value>,
1136) -> Result<LineageGraph> {
1137 let mut gb = GraphBuilder::new();
1138 let macro_prefix = load_macro_prefix(files);
1139 let mut disk_cache = if no_cache {
1140 cache::ExtractionCache::disabled()
1141 } else if refresh_cache {
1142 cache::ExtractionCache::fresh(project_dir, ¯o_prefix, vars, cache_dir)
1143 } else {
1144 cache::ExtractionCache::load(project_dir, ¯o_prefix, vars, cache_dir)
1145 };
1146
1147 let yaml_result = process_yaml_files(&mut gb, files, project_dir)?;
1148 let extraction_cache = process_model_files(
1149 &mut gb,
1150 files,
1151 project_dir,
1152 &yaml_result.model_meta,
1153 ¯o_prefix,
1154 &mut disk_cache,
1155 vars,
1156 &yaml_result.stem_to_versioned,
1157 );
1158 for (unversioned_id, latest_versioned_id) in &yaml_result.version_aliases {
1161 gb.add_alias(unversioned_id.clone(), latest_versioned_id);
1162 }
1163 process_simple_nodes(
1164 &mut gb,
1165 &files.seed_files,
1166 project_dir,
1167 "seed",
1168 NodeType::Seed,
1169 );
1170 process_simple_nodes(
1171 &mut gb,
1172 &files.snapshot_sql_files,
1173 project_dir,
1174 "snapshot",
1175 NodeType::Snapshot,
1176 );
1177 process_yaml_snapshot_nodes(&mut gb, &yaml_result.snapshot_defs);
1178 process_sql_edges(
1179 &mut gb,
1180 files,
1181 project_dir,
1182 ¯o_prefix,
1183 &extraction_cache,
1184 vars,
1185 &yaml_result.stem_to_versioned,
1186 )?;
1187 process_exposures(&mut gb, &yaml_result.exposures);
1188 process_generic_tests(&mut gb, &yaml_result.schemas);
1189 process_semantic_layer(
1190 &mut gb,
1191 &yaml_result.semantic_models,
1192 &yaml_result.metrics,
1193 &yaml_result.saved_queries,
1194 );
1195
1196 disk_cache.save();
1197
1198 Ok(gb.graph)
1199}
1200
1201fn resolve_ref(name: &str, node_map: &HashMap<String, NodeIndex>) -> String {
1203 let model_id = format!("model.{}", name);
1205 if node_map.contains_key(&model_id) {
1206 return model_id;
1207 }
1208
1209 let seed_id = format!("seed.{}", name);
1210 if node_map.contains_key(&seed_id) {
1211 return seed_id;
1212 }
1213
1214 let snapshot_id = format!("snapshot.{}", name);
1215 if node_map.contains_key(&snapshot_id) {
1216 return snapshot_id;
1217 }
1218
1219 model_id
1221}
1222
1223fn parse_relation_source(relation: &str) -> Option<(String, String)> {
1226 let wrapped = format!("{{{{ {} }}}}", relation.trim());
1227 extract_sources(&wrapped)
1228 .into_iter()
1229 .next()
1230 .map(|s| (s.source_name, s.table_name))
1231}
1232
1233fn parse_exposure_ref(dep: &str) -> Option<(String, Option<String>)> {
1237 let dep = dep.trim();
1238 if dep.starts_with("ref(") {
1239 let wrapped = format!("{{{{ {} }}}}", dep);
1241 let refs = crate::parser::sql::extract_refs(&wrapped);
1242 refs.into_iter()
1244 .next()
1245 .filter(|r| r.package.is_none())
1246 .map(|r| (r.name, r.version))
1247 } else {
1248 None
1250 }
1251}
1252
1253#[cfg(test)]
1254mod tests;