Skip to main content

dlin_core/graph/builder/
mod.rs

1use anyhow::Result;
2use petgraph::stable_graph::NodeIndex;
3use rayon::prelude::*;
4use std::collections::HashMap;
5use std::path::Path;
6
7use std::path::PathBuf;
8
9use crate::graph::types::{ExposureInfo, OwnerInfo};
10use crate::parser::cache;
11use crate::parser::columns::extract_select_columns;
12use crate::parser::discovery::DiscoveredFiles;
13use crate::parser::jinja::JinjaExtraction;
14use crate::parser::sql::{
15    RefCall, SourceCall, extract_all_with_vars, extract_refs_and_sources_with_vars, extract_sources,
16};
17use crate::parser::yaml_schema::{
18    ExposureDefinition, MetricDefinition, ModelDefinition, SavedQueryDefinition, SchemaFile,
19    SemanticModelDefinition, SnapshotDefinition, parse_schema_file,
20};
21
22/// Read all macro SQL files, filter out unparseable ones, and return a
23/// pre-built prefix string for prepending to model templates.
24fn load_macro_prefix(files: &DiscoveredFiles) -> String {
25    let sources: Vec<String> = files
26        .macro_sql_files
27        .iter()
28        .filter_map(|path| match std::fs::read_to_string(path) {
29            Ok(content) => Some(content),
30            Err(e) => {
31                crate::warn!("could not read macro file {}: {}", path.display(), e);
32                None
33            }
34        })
35        .collect();
36    crate::parser::jinja::build_macro_prefix(&sources)
37}
38
39use super::types::*;
40
41/// Shared state threaded through the build_graph helper functions
42struct GraphBuilder {
43    graph: LineageGraph,
44    node_map: HashMap<String, NodeIndex>,
45}
46
47impl GraphBuilder {
48    fn new() -> Self {
49        Self {
50            graph: LineageGraph::new(),
51            node_map: HashMap::new(),
52        }
53    }
54
55    /// Add a node and register it in the node map
56    fn add_node(&mut self, data: NodeData) -> NodeIndex {
57        let idx = self.graph.add_node(data);
58        let unique_id = self.graph[idx].unique_id.clone();
59        self.node_map.insert(unique_id, idx);
60        idx
61    }
62
63    /// Register a node_map alias: `from` → same NodeIndex as `to`.
64    /// When `to` is not yet in the map (e.g. its SQL file is missing), a Phantom
65    /// node is created for `to` so that unversioned lookups still resolve to the
66    /// intended versioned unique_id rather than falling back to a generic phantom.
67    /// Also records `from` in the target node's `aliases` list so that the alias
68    /// survives after `build_graph` discards the node_map.
69    fn add_alias(&mut self, from: String, to: &str) {
70        let idx = if let Some(&existing) = self.node_map.get(to) {
71            existing
72        } else {
73            let label = to.strip_prefix("model.").unwrap_or(to).to_string();
74            self.add_node(NodeData {
75                unique_id: to.to_string(),
76                label,
77                node_type: NodeType::Phantom,
78                file_path: None,
79                description: None,
80                materialization: None,
81                tags: vec![],
82                columns: vec![],
83                exposure: None,
84                aliases: vec![],
85            })
86        };
87        if let std::collections::hash_map::Entry::Vacant(e) = self.node_map.entry(from.clone()) {
88            e.insert(idx);
89            self.graph[idx].aliases.push(from);
90        }
91    }
92
93    /// Get or create a phantom ref node, returning its index.
94    /// When `version` is `Some(N)`, resolves only `model.{name}.v{N}` — never
95    /// falls back to the unversioned alias so that version-pinned refs don't
96    /// silently link to the wrong version.
97    fn get_or_create_phantom_ref(
98        &mut self,
99        ref_name: &str,
100        version: Option<String>,
101        sql_path: &Path,
102    ) -> NodeIndex {
103        let dep_id = if let Some(ref v) = version {
104            format!("model.{}.v{}", ref_name, v)
105        } else {
106            resolve_ref(ref_name, &self.node_map)
107        };
108        if let Some(&idx) = self.node_map.get(&dep_id) {
109            return idx;
110        }
111        let display_name = match version.as_deref() {
112            Some(v) => format!("{}.v{}", ref_name, v),
113            None => ref_name.to_string(),
114        };
115        crate::warn!(
116            "unresolved ref '{}' in {}",
117            display_name,
118            sql_path.display()
119        );
120        let phantom_id = match version.as_deref() {
121            Some(v) => format!("model.{}.v{}", ref_name, v),
122            None => format!("model.{}", ref_name),
123        };
124        self.add_node(NodeData {
125            unique_id: phantom_id,
126            label: display_name,
127            node_type: NodeType::Phantom,
128            file_path: None,
129            description: None,
130            materialization: None,
131            tags: vec![],
132            columns: vec![],
133            exposure: None,
134            aliases: vec![],
135        })
136    }
137
138    /// Get or create a phantom source node, returning its index
139    fn get_or_create_phantom_source(
140        &mut self,
141        source_name: &str,
142        table_name: &str,
143        sql_path: &Path,
144    ) -> NodeIndex {
145        let source_id = format!("source.{}.{}", source_name, table_name);
146        if let Some(&idx) = self.node_map.get(&source_id) {
147            return idx;
148        }
149        crate::warn!(
150            "unresolved source '{}.{}' in {}",
151            source_name,
152            table_name,
153            sql_path.display()
154        );
155        let label = format!("{}.{}", source_name, table_name);
156        self.add_node(NodeData {
157            unique_id: source_id,
158            label,
159            node_type: NodeType::Phantom,
160            file_path: None,
161            description: None,
162            materialization: None,
163            tags: vec![],
164            columns: vec![],
165            exposure: None,
166            aliases: vec![],
167        })
168    }
169}
170
171/// Read a file with a descriptive error
172fn read_file(path: &Path) -> Result<String> {
173    std::fs::read_to_string(path).map_err(|e| {
174        crate::error::DbtLineageError::FileReadError {
175            path: path.to_path_buf(),
176            source: e,
177        }
178        .into()
179    })
180}
181
182/// Extract the file stem as a string, defaulting to "unknown"
183fn file_stem_str(path: &Path) -> String {
184    path.file_stem()
185        .and_then(|s| s.to_str())
186        .unwrap_or("unknown")
187        .to_string()
188}
189
190/// Create source nodes from a single schema file's source definitions
191fn add_source_nodes(
192    gb: &mut GraphBuilder,
193    schema: &crate::parser::yaml_schema::SchemaFile,
194    yaml_path: &Path,
195    project_dir: &Path,
196) {
197    let relative_path = yaml_path
198        .strip_prefix(project_dir)
199        .unwrap_or(yaml_path)
200        .to_path_buf();
201    for source_def in &schema.sources {
202        for table in &source_def.tables {
203            let unique_id = format!("source.{}.{}", source_def.name, table.name);
204            let label = format!("{}.{}", source_def.name, table.name);
205            gb.add_node(NodeData {
206                unique_id,
207                label,
208                node_type: NodeType::Source,
209                file_path: Some(relative_path.clone()),
210                description: table
211                    .description
212                    .clone()
213                    .or_else(|| source_def.description.clone()),
214                materialization: None,
215                tags: vec![],
216                columns: vec![],
217                exposure: None,
218                aliases: vec![],
219            });
220        }
221    }
222}
223
224/// Metadata collected from YAML for a model
225#[derive(Clone, Default)]
226struct YamlModelMeta {
227    description: Option<String>,
228    materialization: Option<String>,
229    tags: Vec<String>,
230    columns: Vec<String>,
231}
232
233/// Result of parsing YAML schema files.
234struct YamlParseResult {
235    model_meta: HashMap<String, YamlModelMeta>,
236    exposures: Vec<ExposureDefinition>,
237    /// Each SchemaFile paired with its YAML file relative path (for test node file_path).
238    schemas: Vec<(SchemaFile, PathBuf)>,
239    /// Maps SQL file stems to (versioned_unique_id, base_model_name).
240    stem_to_versioned: HashMap<String, (String, String)>,
241    /// Maps unversioned model IDs to the latest-version unique ID.
242    version_aliases: HashMap<String, String>,
243    /// YAML-only snapshot defs with their yaml file (relative) path.
244    snapshot_defs: Vec<(SnapshotDefinition, PathBuf)>,
245    /// Semantic layer defs paired with the relative YAML path they came from.
246    semantic_models: Vec<(SemanticModelDefinition, PathBuf)>,
247    metrics: Vec<(MetricDefinition, PathBuf)>,
248    saved_queries: Vec<(SavedQueryDefinition, PathBuf)>,
249}
250
251/// Build version maps for a single versioned model definition.
252/// Returns entries to add to `stem_to_versioned` and `version_aliases`.
253#[allow(clippy::type_complexity)]
254fn build_version_maps(
255    model_def: &ModelDefinition,
256) -> (Vec<(String, (String, String))>, Option<(String, String)>) {
257    if model_def.versions.is_empty() {
258        return (vec![], None);
259    }
260    let name = &model_def.name;
261    let mut stem_entries: Vec<(String, (String, String))> = Vec::new();
262    for vspec in &model_def.versions {
263        let v_str = vspec.v_str();
264        let stem = vspec.sql_stem(name);
265        let unique_id = format!("model.{}.v{}", name, v_str);
266        stem_entries.push((stem, (unique_id, name.clone())));
267    }
268    let alias = model_def.resolved_latest_version_str().map(|lv_str| {
269        let unversioned_id = format!("model.{}", name);
270        let latest_versioned_id = format!("model.{}.v{}", name, lv_str);
271        (unversioned_id, latest_versioned_id)
272    });
273    (stem_entries, alias)
274}
275
276/// Parse YAML schema files: create source nodes, collect model metadata, exposures,
277/// and parsed schemas (for generic test extraction).
278fn process_yaml_files(
279    gb: &mut GraphBuilder,
280    files: &DiscoveredFiles,
281    project_dir: &Path,
282) -> Result<YamlParseResult> {
283    let mut model_meta: HashMap<String, YamlModelMeta> = HashMap::new();
284    let mut exposures: Vec<ExposureDefinition> = Vec::new();
285    let mut schemas: Vec<(SchemaFile, PathBuf)> = Vec::new();
286    let mut stem_to_versioned: HashMap<String, (String, String)> = HashMap::new();
287    let mut version_aliases: HashMap<String, String> = HashMap::new();
288    let mut snapshot_defs: Vec<(SnapshotDefinition, PathBuf)> = Vec::new();
289    let mut semantic_models: Vec<(SemanticModelDefinition, PathBuf)> = Vec::new();
290    let mut metrics: Vec<(MetricDefinition, PathBuf)> = Vec::new();
291    let mut saved_queries: Vec<(SavedQueryDefinition, PathBuf)> = Vec::new();
292
293    // Sort YAML paths so that duplicate-test-ID suffixes (_2, _3, …) are
294    // deterministic across filesystems/OSes.
295    let mut sorted_yaml_files = files.yaml_files.clone();
296    sorted_yaml_files.sort();
297
298    for yaml_path in &sorted_yaml_files {
299        let content = read_file(yaml_path)?;
300        let schema = match parse_schema_file(&content, Some(yaml_path.as_path())) {
301            Ok(s) => s,
302            Err(_) => continue,
303        };
304
305        add_source_nodes(gb, &schema, yaml_path, project_dir);
306
307        for model_def in &schema.models {
308            let mut meta = YamlModelMeta {
309                description: model_def.description.clone(),
310                columns: model_def.columns.iter().map(|c| c.name.clone()).collect(),
311                ..Default::default()
312            };
313            // Merge tags from model-level and config-level
314            let mut tags = model_def.tags.clone();
315            if let Some(cfg) = &model_def.config {
316                meta.materialization = cfg.materialized.clone();
317                tags.extend(cfg.tags.clone());
318            }
319            tags.sort();
320            tags.dedup();
321            meta.tags = tags;
322            model_meta.insert(model_def.name.clone(), meta);
323
324            // Collect versioned model maps
325            let (stem_entries, alias) = build_version_maps(model_def);
326            for (stem, entry) in stem_entries {
327                stem_to_versioned.entry(stem).or_insert(entry);
328            }
329            if let Some((unversioned_id, latest_versioned_id)) = alias {
330                version_aliases
331                    .entry(unversioned_id)
332                    .or_insert(latest_versioned_id);
333            }
334        }
335
336        exposures.extend(schema.exposures.iter().cloned());
337
338        let relative_path = yaml_path
339            .strip_prefix(project_dir)
340            .unwrap_or(yaml_path)
341            .to_path_buf();
342
343        semantic_models.extend(
344            schema
345                .semantic_models
346                .iter()
347                .cloned()
348                .map(|sm| (sm, relative_path.clone())),
349        );
350        metrics.extend(
351            schema
352                .metrics
353                .iter()
354                .cloned()
355                .map(|m| (m, relative_path.clone())),
356        );
357        saved_queries.extend(
358            schema
359                .saved_queries
360                .iter()
361                .cloned()
362                .map(|sq| (sq, relative_path.clone())),
363        );
364
365        for snap_def in &schema.snapshots {
366            snapshot_defs.push((snap_def.clone(), relative_path.clone()));
367        }
368        schemas.push((schema, relative_path));
369    }
370
371    Ok(YamlParseResult {
372        model_meta,
373        exposures,
374        schemas,
375        stem_to_versioned,
376        version_aliases,
377        snapshot_defs,
378        semantic_models,
379        metrics,
380        saved_queries,
381    })
382}
383
384/// Cached extraction result for a model SQL file (refs and sources).
385/// Avoids re-running minijinja in `process_sql_edges`.
386type ExtractionCache = HashMap<PathBuf, (Vec<RefCall>, Vec<SourceCall>)>;
387
388/// Result of parallel extraction for a single model SQL file
389struct ModelExtraction {
390    sql_path: PathBuf,
391    model_name: String,
392    extraction: Option<JinjaExtraction>,
393    columns: Vec<String>,
394    /// Whether this extraction came from the disk cache (no need to re-save)
395    from_cache: bool,
396}
397
398/// Create nodes for model SQL files (with duplicate detection).
399/// Returns an in-memory cache of refs/sources (for `process_sql_edges`)
400/// and updates the disk cache with newly extracted results.
401///
402/// `stem_to_versioned` maps SQL file stems to `(versioned_unique_id, base_model_name)`.
403/// When a file stem is present in this map, the node is registered under the
404/// versioned unique_id (e.g. `model.my_model.v2`) and model metadata is looked up
405/// under the base name.
406#[allow(clippy::too_many_arguments)]
407fn process_model_files(
408    gb: &mut GraphBuilder,
409    files: &DiscoveredFiles,
410    project_dir: &Path,
411    model_meta: &HashMap<String, YamlModelMeta>,
412    macro_prefix: &str,
413    disk_cache: &mut cache::ExtractionCache,
414    vars: &HashMap<String, serde_json::Value>,
415    stem_to_versioned: &HashMap<String, (String, String)>,
416) -> ExtractionCache {
417    // Parallel phase: read files and run minijinja extraction concurrently.
418    // Uses disk cache (immutable borrow) to skip rendering for unchanged files.
419    let cache_ref = &*disk_cache;
420    let extractions: Vec<ModelExtraction> = files
421        .model_sql_files
422        .par_iter()
423        .map(|sql_path| {
424            let model_name = file_stem_str(sql_path);
425
426            // Check disk cache first
427            if let Some(cached) = cache_ref.get(sql_path, project_dir) {
428                let sql_content = std::fs::read_to_string(sql_path).ok();
429                let columns = sql_content
430                    .as_ref()
431                    .map(|content| extract_select_columns(content))
432                    .unwrap_or_default();
433                return ModelExtraction {
434                    sql_path: sql_path.clone(),
435                    model_name,
436                    extraction: Some(cached.clone()),
437                    columns,
438                    from_cache: true,
439                };
440            }
441
442            let sql_content = std::fs::read_to_string(sql_path).ok();
443
444            let extraction = sql_content
445                .as_ref()
446                .map(|content| extract_all_with_vars(content, macro_prefix, vars));
447
448            let columns = sql_content
449                .as_ref()
450                .map(|content| extract_select_columns(content))
451                .unwrap_or_default();
452
453            ModelExtraction {
454                sql_path: sql_path.clone(),
455                model_name,
456                extraction,
457                columns,
458                from_cache: false,
459            }
460        })
461        .collect();
462
463    // Sequential phase: insert nodes into the graph and update disk cache
464    let mut model_name_paths: HashMap<String, std::path::PathBuf> = HashMap::new();
465    let mut mem_cache: ExtractionCache = HashMap::new();
466
467    for me in extractions {
468        if let Some(existing_path) = model_name_paths.get(&me.model_name) {
469            crate::warn!(
470                "duplicate model name '{}' in {} and {}",
471                me.model_name,
472                existing_path.display(),
473                me.sql_path.display()
474            );
475        }
476        model_name_paths.insert(me.model_name.clone(), me.sql_path.clone());
477
478        let from_cache = me.from_cache;
479        let (sql_config, cached_refs_sources) = match me.extraction {
480            Some(ext) => {
481                // Save newly extracted results to disk cache
482                if !from_cache {
483                    disk_cache.insert(&me.sql_path, project_dir, &ext);
484                }
485                (ext.config, Some((ext.refs, ext.sources)))
486            }
487            None => (Default::default(), None),
488        };
489
490        if let Some(rs) = cached_refs_sources {
491            mem_cache.insert(me.sql_path.clone(), rs);
492        }
493
494        // Resolve versioned unique_id and base model name for YAML metadata lookup.
495        let (unique_id, label, meta_key) =
496            if let Some((versioned_id, base_name)) = stem_to_versioned.get(&me.model_name) {
497                (
498                    versioned_id.clone(),
499                    // label: e.g. "my_model.v2"
500                    versioned_id
501                        .strip_prefix("model.")
502                        .unwrap_or(versioned_id)
503                        .to_string(),
504                    base_name.as_str(),
505                )
506            } else {
507                let uid = format!("model.{}", me.model_name);
508                (uid, me.model_name.clone(), me.model_name.as_str())
509            };
510
511        let yaml_meta = model_meta.get(meta_key);
512
513        let materialization = sql_config
514            .materialized
515            .or_else(|| yaml_meta.and_then(|m| m.materialization.clone()));
516
517        let mut tags = sql_config.tags;
518        if let Some(meta) = yaml_meta {
519            tags.extend(meta.tags.clone());
520        }
521        tags.sort();
522        tags.dedup();
523
524        let relative_path = me
525            .sql_path
526            .strip_prefix(project_dir)
527            .unwrap_or(&me.sql_path)
528            .to_path_buf();
529
530        // Prefer YAML-defined columns; fall back to SQL extraction (best-effort)
531        let columns = match yaml_meta {
532            Some(m) if !m.columns.is_empty() => m.columns.clone(),
533            _ => me.columns,
534        };
535
536        gb.add_node(NodeData {
537            unique_id,
538            label,
539            node_type: NodeType::Model,
540            file_path: Some(relative_path),
541            description: yaml_meta.and_then(|m| m.description.clone()),
542            materialization,
543            tags,
544            columns,
545            exposure: None,
546            aliases: vec![],
547        });
548    }
549
550    mem_cache
551}
552
553/// Create nodes for simple file-based resources (seeds, snapshots)
554fn process_simple_nodes(
555    gb: &mut GraphBuilder,
556    paths: &[std::path::PathBuf],
557    project_dir: &Path,
558    prefix: &str,
559    node_type: NodeType,
560) {
561    for path in paths {
562        let name = file_stem_str(path);
563        let unique_id = format!("{}.{}", prefix, name);
564        let relative_path = path.strip_prefix(project_dir).unwrap_or(path).to_path_buf();
565
566        gb.add_node(NodeData {
567            unique_id,
568            label: name,
569            node_type,
570            file_path: Some(relative_path),
571            description: None,
572            materialization: None,
573            tags: vec![],
574            columns: vec![],
575            exposure: None,
576            aliases: vec![],
577        });
578    }
579}
580
581/// Parse SQL files for ref()/source() calls and add edges.
582/// `extraction_cache` contains pre-extracted refs/sources for model files
583/// (from `process_model_files`) to avoid redundant minijinja renders.
584/// `stem_to_versioned` is used to locate versioned model nodes by SQL file
585/// stem without relying on node_map aliases (which may point to a different
586/// version when `defined_in` uses a base-model name).
587fn process_sql_edges(
588    gb: &mut GraphBuilder,
589    files: &DiscoveredFiles,
590    project_dir: &Path,
591    macro_prefix: &str,
592    extraction_cache: &ExtractionCache,
593    vars: &HashMap<String, serde_json::Value>,
594    stem_to_versioned: &HashMap<String, (String, String)>,
595) -> Result<()> {
596    let all_sql_files: Vec<(&std::path::PathBuf, &str)> = files
597        .model_sql_files
598        .iter()
599        .map(|p| (p, "model"))
600        .chain(files.snapshot_sql_files.iter().map(|p| (p, "snapshot")))
601        .chain(files.test_sql_files.iter().map(|p| (p, "test")))
602        .collect();
603
604    for (sql_path, file_type) in &all_sql_files {
605        let node_name = file_stem_str(sql_path);
606        let node_unique_id = format!("{}.{}", file_type, node_name);
607
608        // Create test nodes on the fly
609        if *file_type == "test" {
610            let relative_path = sql_path
611                .strip_prefix(project_dir)
612                .unwrap_or(sql_path)
613                .to_path_buf();
614            gb.add_node(NodeData {
615                unique_id: node_unique_id.clone(),
616                label: node_name.clone(),
617                node_type: NodeType::Test,
618                file_path: Some(relative_path),
619                description: None,
620                materialization: None,
621                tags: vec![],
622                columns: vec![],
623                exposure: None,
624                aliases: vec![],
625            });
626        }
627
628        // For model files, resolve via stem_to_versioned to get the exact versioned
629        // node ID. This avoids the collision where node_map["model.my_model"] already
630        // points to the latest-version alias rather than the file being processed.
631        let current_idx = if *file_type == "model" {
632            let lookup_id = stem_to_versioned
633                .get(&node_name)
634                .map(|(versioned_id, _)| versioned_id.as_str())
635                .unwrap_or(&node_unique_id);
636            match gb.node_map.get(lookup_id) {
637                Some(&idx) => idx,
638                None => continue,
639            }
640        } else {
641            match gb.node_map.get(&node_unique_id) {
642                Some(&idx) => idx,
643                None => continue,
644            }
645        };
646
647        // Use cached extraction for model files; extract fresh for others
648        let owned;
649        let (refs, sources) = if let Some(cached) = extraction_cache.get(*sql_path) {
650            (&cached.0, &cached.1)
651        } else {
652            let content = read_file(sql_path)?;
653            owned = extract_refs_and_sources_with_vars(&content, macro_prefix, vars);
654            (&owned.0, &owned.1)
655        };
656
657        // Use EdgeType::Test when the target node is a test, so all test
658        // relationships render with consistent edge labels/styles.
659        let is_test = *file_type == "test";
660
661        for ref_call in refs {
662            let dep_idx =
663                gb.get_or_create_phantom_ref(&ref_call.name, ref_call.version.clone(), sql_path);
664            let edge_type = if is_test {
665                EdgeType::Test
666            } else {
667                EdgeType::Ref
668            };
669            gb.graph
670                .add_edge(dep_idx, current_idx, EdgeData::direct(edge_type));
671        }
672
673        for source_call in sources {
674            let source_idx = gb.get_or_create_phantom_source(
675                &source_call.source_name,
676                &source_call.table_name,
677                sql_path,
678            );
679            let edge_type = if is_test {
680                EdgeType::Test
681            } else {
682                EdgeType::Source
683            };
684            gb.graph
685                .add_edge(source_idx, current_idx, EdgeData::direct(edge_type));
686        }
687    }
688
689    Ok(())
690}
691
692/// Create exposure nodes and edges to their dependencies
693fn process_exposures(gb: &mut GraphBuilder, exposures: &[ExposureDefinition]) {
694    for exposure in exposures {
695        let unique_id = format!("exposure.{}", exposure.name);
696        let idx = gb.add_node(NodeData {
697            unique_id,
698            label: exposure.name.clone(),
699            node_type: NodeType::Exposure,
700            file_path: None,
701            description: exposure.description.clone(),
702            materialization: None,
703            tags: vec![],
704            columns: vec![],
705            exposure: Some(ExposureInfo {
706                label: exposure.label.clone(),
707                exposure_type: exposure.exposure_type.clone(),
708                url: exposure.url.clone(),
709                maturity: exposure.maturity.clone(),
710                owner: exposure.owner.as_ref().map(|o| OwnerInfo {
711                    name: o.name.as_ref().filter(|s| !s.trim().is_empty()).cloned(),
712                    email: o.email.as_ref().filter(|s| !s.trim().is_empty()).cloned(),
713                }),
714            }),
715            aliases: vec![],
716        });
717
718        for dep in &exposure.depends_on {
719            if let Some((model_name, version)) = parse_exposure_ref(dep) {
720                let dep_id = if let Some(ref v) = version {
721                    format!("model.{}.v{}", model_name, v)
722                } else {
723                    resolve_ref(&model_name, &gb.node_map)
724                };
725                if let Some(&dep_idx) = gb.node_map.get(&dep_id) {
726                    gb.graph
727                        .add_edge(dep_idx, idx, EdgeData::direct(EdgeType::Exposure));
728                }
729            }
730        }
731    }
732}
733
734/// Deduplicate a candidate unique_id by appending `_2`, `_3`, … if it already
735/// exists in the node map.  Returns `(unique_id, suffix)` where `suffix` is
736/// `None` when no deduplication was needed, or `Some("_2")` etc. when it was.
737/// Callers can append the suffix to labels so they stay distinct too.
738fn dedup_unique_id(
739    candidate: &str,
740    node_map: &HashMap<String, NodeIndex>,
741) -> (String, Option<String>) {
742    if !node_map.contains_key(candidate) {
743        return (candidate.to_string(), None);
744    }
745    let mut n = 2u32;
746    loop {
747        let suffix = format!("_{}", n);
748        let suffixed = format!("{}{}", candidate, suffix);
749        if !node_map.contains_key(&suffixed) {
750            return (suffixed, Some(suffix));
751        }
752        n += 1;
753    }
754}
755
756/// Add a generic test node to the graph and connect it to the parent.
757fn add_generic_test_node(
758    gb: &mut GraphBuilder,
759    parent_idx: NodeIndex,
760    unique_id: String,
761    label: String,
762    file_path: Option<PathBuf>,
763) {
764    let idx = gb.add_node(NodeData {
765        unique_id,
766        label,
767        node_type: NodeType::Test,
768        file_path,
769        description: None,
770        materialization: None,
771        tags: vec![],
772        columns: vec![],
773        exposure: None,
774        aliases: vec![],
775    });
776    gb.graph
777        .add_edge(parent_idx, idx, EdgeData::direct(EdgeType::Test));
778}
779
780/// Create test nodes for YAML-declared generic tests (not_null, unique, etc.)
781/// and connect them to their parent model/source nodes.
782fn process_generic_tests(gb: &mut GraphBuilder, schemas: &[(SchemaFile, PathBuf)]) {
783    for (schema, yaml_path) in schemas {
784        let file_path = Some(yaml_path.clone());
785
786        // Model-level generic tests.
787        // For versioned models, `model.{name}` is an alias to the latest version node,
788        // so the lookup still works without special-casing.
789        for model_def in &schema.models {
790            let parent_id = format!("model.{}", model_def.name);
791            let parent_idx = match gb.node_map.get(&parent_id) {
792                Some(&idx) => idx,
793                None => continue,
794            };
795
796            // Model-level tests (not attached to a column)
797            for test_def in &model_def.tests {
798                let test_name = match test_def.test_name() {
799                    Some(name) => name,
800                    None => continue,
801                };
802                let candidate = format!("test.{}.{}", test_name, model_def.name);
803                let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
804                let mut label = format!("{}_{}", test_name, model_def.name);
805                if let Some(s) = suffix {
806                    label.push_str(&s);
807                }
808                add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
809            }
810
811            // Column-level tests
812            for col in &model_def.columns {
813                for test_def in &col.tests {
814                    let test_name = match test_def.test_name() {
815                        Some(name) => name,
816                        None => continue,
817                    };
818                    let candidate = format!("test.{}.{}.{}", test_name, model_def.name, col.name);
819                    let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
820                    let mut label = format!("{}_{}_{}", test_name, model_def.name, col.name);
821                    if let Some(s) = suffix {
822                        label.push_str(&s);
823                    }
824                    add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
825                }
826            }
827        }
828
829        // Source-level generic tests (column-level only)
830        for source_def in &schema.sources {
831            for table in &source_def.tables {
832                let parent_id = format!("source.{}.{}", source_def.name, table.name);
833                let parent_idx = match gb.node_map.get(&parent_id) {
834                    Some(&idx) => idx,
835                    None => continue,
836                };
837                for col in &table.columns {
838                    for test_def in &col.tests {
839                        let test_name = match test_def.test_name() {
840                            Some(name) => name,
841                            None => continue,
842                        };
843                        let candidate = format!(
844                            "test.{}.{}.{}.{}",
845                            test_name, source_def.name, table.name, col.name
846                        );
847                        let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
848                        let mut label = format!(
849                            "{}_{}_{}_{}",
850                            test_name, source_def.name, table.name, col.name
851                        );
852                        if let Some(s) = suffix {
853                            label.push_str(&s);
854                        }
855                        add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
856                    }
857                }
858            }
859        }
860    }
861}
862
863/// Register YAML-only snapshot nodes (dbt v1.9+) and add their upstream edges.
864/// Snapshots already registered from a SQL file are skipped; for those without a
865/// matching SQL file the node is created here and linked to the upstream model via
866/// the `relation: ref('...')` field.
867///
868/// Two-pass approach: first register all nodes so that forward references between
869/// YAML-only snapshots resolve correctly, then add edges.
870fn process_yaml_snapshot_nodes(
871    gb: &mut GraphBuilder,
872    snapshot_defs: &[(SnapshotDefinition, PathBuf)],
873) {
874    // Pass 1: register all YAML-only snapshot nodes.
875    // `gb.node_map.contains_key` guards against both SQL-registered nodes (already
876    // present before this pass) and duplicate YAML definitions (added earlier in
877    // this same pass), so each unique_id is added at most once.
878    let mut yaml_registered = std::collections::HashSet::<String>::new();
879    for (snap_def, yaml_path) in snapshot_defs {
880        let unique_id = format!("snapshot.{}", snap_def.name);
881        if gb.node_map.contains_key(&unique_id) {
882            continue;
883        }
884        gb.add_node(NodeData {
885            unique_id: unique_id.clone(),
886            label: snap_def.name.clone(),
887            node_type: NodeType::Snapshot,
888            file_path: Some(yaml_path.clone()),
889            description: snap_def.description.clone(),
890            materialization: None,
891            tags: vec![],
892            columns: vec![],
893            exposure: None,
894            aliases: vec![],
895        });
896        yaml_registered.insert(unique_id);
897    }
898
899    // Pass 2: resolve upstream edges now that all snapshot nodes exist.
900    for (snap_def, yaml_path) in snapshot_defs {
901        let unique_id = format!("snapshot.{}", snap_def.name);
902        if !yaml_registered.remove(&unique_id) {
903            continue;
904        }
905        let Some(&snap_idx) = gb.node_map.get(&unique_id) else {
906            continue;
907        };
908        if let Some(relation) = &snap_def.relation {
909            if let Some((source_name, table_name)) = parse_relation_source(relation) {
910                let dep_idx =
911                    gb.get_or_create_phantom_source(&source_name, &table_name, yaml_path.as_path());
912                gb.graph
913                    .add_edge(dep_idx, snap_idx, EdgeData::direct(EdgeType::Source));
914            } else if let Some((model_name, version)) = parse_exposure_ref(relation) {
915                let dep_idx =
916                    gb.get_or_create_phantom_ref(&model_name, version, yaml_path.as_path());
917                gb.graph
918                    .add_edge(dep_idx, snap_idx, EdgeData::direct(EdgeType::Ref));
919            }
920        }
921    }
922}
923
924/// Build and connect semantic layer nodes (semantic_models, metrics, saved_queries).
925///
926/// Each definition is paired with the relative path of the YAML file it came from,
927/// which is stored on the node for debuggability (consistent with other YAML-derived nodes).
928///
929/// Pass ordering:
930///   1. Register all semantic_model nodes (enables forward references between metrics).
931///   2. Build measure → semantic_model_name map.
932///   3. Add model → semantic_model edges via `model: ref('...')`.
933///   4. Register all metric nodes.
934///   5. Add semantic_model → metric edges (Simple metrics via measure lookup).
935///   6. Add metric → metric edges (Ratio/Derived/Conversion/Cumulative).
936///   7. Register all saved_query nodes and add metric → saved_query edges.
937fn process_semantic_layer(
938    gb: &mut GraphBuilder,
939    semantic_models: &[(SemanticModelDefinition, PathBuf)],
940    metrics: &[(MetricDefinition, PathBuf)],
941    saved_queries: &[(SavedQueryDefinition, PathBuf)],
942) {
943    // Pass 1: register semantic_model nodes
944    for (sm, yaml_path) in semantic_models {
945        let unique_id = format!("semantic_model.{}", sm.name);
946        if gb.node_map.contains_key(&unique_id) {
947            continue;
948        }
949        gb.add_node(NodeData {
950            unique_id,
951            label: sm.label.as_deref().unwrap_or(&sm.name).to_string(),
952            node_type: NodeType::SemanticModel,
953            file_path: Some(yaml_path.clone()),
954            description: sm.description.clone(),
955            materialization: None,
956            tags: vec![],
957            columns: vec![],
958            exposure: None,
959            aliases: vec![],
960        });
961    }
962
963    // Pass 2: build measure_name → semantic_model_name map and add model edges
964    let mut measure_to_sem: HashMap<String, String> = HashMap::new();
965    for (sm, yaml_path) in semantic_models {
966        let sem_id = format!("semantic_model.{}", sm.name);
967        let Some(&sem_idx) = gb.node_map.get(&sem_id) else {
968            continue;
969        };
970        for measure in &sm.measures {
971            if let Some(existing) = measure_to_sem.get(&measure.name) {
972                if existing != &sm.name {
973                    crate::warn!(
974                        "measure '{}' defined in both semantic_model '{}' and '{}'; \
975                         linking metrics to '{}'",
976                        measure.name,
977                        existing,
978                        sm.name,
979                        existing
980                    );
981                }
982            } else {
983                measure_to_sem.insert(measure.name.clone(), sm.name.clone());
984            }
985        }
986        // Add edge: model_node → semantic_model_node
987        if let Some(model_ref) = &sm.model
988            && let Some((model_name, version)) = parse_exposure_ref(model_ref)
989        {
990            let dep_idx = gb.get_or_create_phantom_ref(&model_name, version, yaml_path.as_path());
991            gb.graph
992                .add_edge(dep_idx, sem_idx, EdgeData::direct(EdgeType::Ref));
993        }
994    }
995
996    // Pass 3: register metric nodes
997    for (metric, yaml_path) in metrics {
998        let unique_id = format!("metric.{}", metric.name);
999        if gb.node_map.contains_key(&unique_id) {
1000            continue;
1001        }
1002        gb.add_node(NodeData {
1003            unique_id,
1004            label: metric.label.as_deref().unwrap_or(&metric.name).to_string(),
1005            node_type: NodeType::Metric,
1006            file_path: Some(yaml_path.clone()),
1007            description: metric.description.clone(),
1008            materialization: None,
1009            tags: vec![],
1010            columns: vec![],
1011            exposure: None,
1012            aliases: vec![],
1013        });
1014    }
1015
1016    // Pass 4: add semantic_model → metric and metric → metric edges
1017    for (metric, yaml_path) in metrics {
1018        let metric_id = format!("metric.{}", metric.name);
1019        let Some(&metric_idx) = gb.node_map.get(&metric_id) else {
1020            continue;
1021        };
1022        // Link to semantic models via measure references (Simple, Conversion, …).
1023        // Deduplicate: a conversion metric's base_measure and conversion_measure may
1024        // both belong to the same semantic model, which would otherwise add the edge twice.
1025        // Use seen-set + ordered iteration to keep insertion order deterministic.
1026        let mut seen_sem_indices = std::collections::HashSet::new();
1027        for measure_name in metric.measure_refs() {
1028            let Some(sem_name) = measure_to_sem.get(measure_name) else {
1029                continue;
1030            };
1031            let sem_id = format!("semantic_model.{}", sem_name);
1032            let Some(&sem_idx) = gb.node_map.get(&sem_id) else {
1033                continue;
1034            };
1035            if seen_sem_indices.insert(sem_idx) {
1036                gb.graph
1037                    .add_edge(sem_idx, metric_idx, EdgeData::direct(EdgeType::Ref));
1038            }
1039        }
1040        // Ratio/Derived/Conversion/Cumulative: link to upstream metrics (deduplicated,
1041        // preserving original order so graph insertion is deterministic)
1042        let mut seen_metric_refs = std::collections::HashSet::new();
1043        for dep_metric_name in metric.metric_refs() {
1044            if !seen_metric_refs.insert(dep_metric_name) {
1045                continue;
1046            }
1047            let dep_id = format!("metric.{}", dep_metric_name);
1048            let dep_idx = if let Some(&idx) = gb.node_map.get(&dep_id) {
1049                idx
1050            } else {
1051                crate::warn!(
1052                    "unresolved metric ref '{}' from metric '{}'",
1053                    dep_metric_name,
1054                    metric.name
1055                );
1056                gb.add_node(NodeData {
1057                    unique_id: dep_id,
1058                    label: dep_metric_name.to_string(),
1059                    node_type: NodeType::Phantom,
1060                    file_path: Some(yaml_path.clone()),
1061                    description: None,
1062                    materialization: None,
1063                    tags: vec![],
1064                    columns: vec![],
1065                    exposure: None,
1066                    aliases: vec![],
1067                })
1068            };
1069            gb.graph
1070                .add_edge(dep_idx, metric_idx, EdgeData::direct(EdgeType::Ref));
1071        }
1072    }
1073
1074    // Pass 5: register saved_query nodes and add metric → saved_query edges
1075    for (sq, yaml_path) in saved_queries {
1076        let sq_id = format!("saved_query.{}", sq.name);
1077        if gb.node_map.contains_key(&sq_id) {
1078            continue;
1079        }
1080        let sq_idx = gb.add_node(NodeData {
1081            unique_id: sq_id.clone(),
1082            label: sq.label.as_deref().unwrap_or(&sq.name).to_string(),
1083            node_type: NodeType::SavedQuery,
1084            file_path: Some(yaml_path.clone()),
1085            description: sq.description.clone(),
1086            materialization: None,
1087            tags: vec![],
1088            columns: vec![],
1089            exposure: None,
1090            aliases: vec![],
1091        });
1092        if let Some(qp) = &sq.query_params {
1093            for metric_name in &qp.metrics {
1094                let metric_dep_id = format!("metric.{}", metric_name);
1095                let dep_idx = if let Some(&idx) = gb.node_map.get(&metric_dep_id) {
1096                    idx
1097                } else {
1098                    crate::warn!(
1099                        "unresolved metric ref '{}' in saved_query '{}'",
1100                        metric_name,
1101                        sq.name
1102                    );
1103                    gb.add_node(NodeData {
1104                        unique_id: metric_dep_id,
1105                        label: metric_name.clone(),
1106                        node_type: NodeType::Phantom,
1107                        file_path: Some(yaml_path.clone()),
1108                        description: None,
1109                        materialization: None,
1110                        tags: vec![],
1111                        columns: vec![],
1112                        exposure: None,
1113                        aliases: vec![],
1114                    })
1115                };
1116                gb.graph
1117                    .add_edge(dep_idx, sq_idx, EdgeData::direct(EdgeType::Ref));
1118            }
1119        }
1120    }
1121}
1122
1123/// Build the lineage graph from discovered files.
1124/// If `cache_dir` is provided, it is used as the cache directory;
1125/// otherwise the cache is stored under `<project_dir>/.dlin_cache/`.
1126/// If `no_cache` is true, the extraction cache is completely disabled.
1127/// If `refresh_cache` is true, the existing cache is ignored but new results
1128/// are written to disk.
1129pub fn build_graph(
1130    project_dir: &Path,
1131    files: &DiscoveredFiles,
1132    cache_dir: Option<&Path>,
1133    no_cache: bool,
1134    refresh_cache: bool,
1135    vars: &HashMap<String, serde_json::Value>,
1136) -> Result<LineageGraph> {
1137    let mut gb = GraphBuilder::new();
1138    let macro_prefix = load_macro_prefix(files);
1139    let mut disk_cache = if no_cache {
1140        cache::ExtractionCache::disabled()
1141    } else if refresh_cache {
1142        cache::ExtractionCache::fresh(project_dir, &macro_prefix, vars, cache_dir)
1143    } else {
1144        cache::ExtractionCache::load(project_dir, &macro_prefix, vars, cache_dir)
1145    };
1146
1147    let yaml_result = process_yaml_files(&mut gb, files, project_dir)?;
1148    let extraction_cache = process_model_files(
1149        &mut gb,
1150        files,
1151        project_dir,
1152        &yaml_result.model_meta,
1153        &macro_prefix,
1154        &mut disk_cache,
1155        vars,
1156        &yaml_result.stem_to_versioned,
1157    );
1158    // Register "model.name" aliases to the latest versioned node so that
1159    // unversioned ref('name') calls resolve correctly.
1160    for (unversioned_id, latest_versioned_id) in &yaml_result.version_aliases {
1161        gb.add_alias(unversioned_id.clone(), latest_versioned_id);
1162    }
1163    process_simple_nodes(
1164        &mut gb,
1165        &files.seed_files,
1166        project_dir,
1167        "seed",
1168        NodeType::Seed,
1169    );
1170    process_simple_nodes(
1171        &mut gb,
1172        &files.snapshot_sql_files,
1173        project_dir,
1174        "snapshot",
1175        NodeType::Snapshot,
1176    );
1177    process_yaml_snapshot_nodes(&mut gb, &yaml_result.snapshot_defs);
1178    process_sql_edges(
1179        &mut gb,
1180        files,
1181        project_dir,
1182        &macro_prefix,
1183        &extraction_cache,
1184        vars,
1185        &yaml_result.stem_to_versioned,
1186    )?;
1187    process_exposures(&mut gb, &yaml_result.exposures);
1188    process_generic_tests(&mut gb, &yaml_result.schemas);
1189    process_semantic_layer(
1190        &mut gb,
1191        &yaml_result.semantic_models,
1192        &yaml_result.metrics,
1193        &yaml_result.saved_queries,
1194    );
1195
1196    disk_cache.save();
1197
1198    Ok(gb.graph)
1199}
1200
1201/// Try to resolve a ref name to a node unique_id
1202fn resolve_ref(name: &str, node_map: &HashMap<String, NodeIndex>) -> String {
1203    // Try model first, then seed, then snapshot
1204    let model_id = format!("model.{}", name);
1205    if node_map.contains_key(&model_id) {
1206        return model_id;
1207    }
1208
1209    let seed_id = format!("seed.{}", name);
1210    if node_map.contains_key(&seed_id) {
1211        return seed_id;
1212    }
1213
1214    let snapshot_id = format!("snapshot.{}", name);
1215    if node_map.contains_key(&snapshot_id) {
1216        return snapshot_id;
1217    }
1218
1219    // Default to model
1220    model_id
1221}
1222
1223/// Parse a source('schema', 'table') string (no Jinja delimiters).
1224/// Returns (source_name, table_name), or None if the string is not a source() call.
1225fn parse_relation_source(relation: &str) -> Option<(String, String)> {
1226    let wrapped = format!("{{{{ {} }}}}", relation.trim());
1227    extract_sources(&wrapped)
1228        .into_iter()
1229        .next()
1230        .map(|s| (s.source_name, s.table_name))
1231}
1232
1233/// Parse a ref('name') or ref('name', version=N) string from exposure depends_on.
1234/// Returns (model_name, optional_version). Source refs and package-qualified refs
1235/// (cross-package) return None — cross-package exposure links are not yet supported.
1236fn parse_exposure_ref(dep: &str) -> Option<(String, Option<String>)> {
1237    let dep = dep.trim();
1238    if dep.starts_with("ref(") {
1239        // Wrap in {{ }} so we can reuse the SQL regex extractor.
1240        let wrapped = format!("{{{{ {} }}}}", dep);
1241        let refs = crate::parser::sql::extract_refs(&wrapped);
1242        // Skip package-qualified refs — cross-package resolution is not supported here.
1243        refs.into_iter()
1244            .next()
1245            .filter(|r| r.package.is_none())
1246            .map(|r| (r.name, r.version))
1247    } else {
1248        // source() and other strings: no edge
1249        None
1250    }
1251}
1252
1253#[cfg(test)]
1254mod tests;