Skip to main content

dlin_core/graph/
builder.rs

1use anyhow::Result;
2use petgraph::stable_graph::NodeIndex;
3use rayon::prelude::*;
4use std::collections::HashMap;
5use std::path::Path;
6
7use std::path::PathBuf;
8
9use crate::graph::types::{ExposureInfo, OwnerInfo};
10use crate::parser::cache;
11use crate::parser::columns::extract_select_columns;
12use crate::parser::discovery::DiscoveredFiles;
13use crate::parser::jinja::JinjaExtraction;
14use crate::parser::sql::{
15    RefCall, SourceCall, extract_all_with_vars, extract_refs_and_sources_with_vars,
16};
17use crate::parser::yaml_schema::{ExposureDefinition, SchemaFile, parse_schema_file};
18
19/// Read all macro SQL files, filter out unparseable ones, and return a
20/// pre-built prefix string for prepending to model templates.
21fn load_macro_prefix(files: &DiscoveredFiles) -> String {
22    let sources: Vec<String> = files
23        .macro_sql_files
24        .iter()
25        .filter_map(|path| match std::fs::read_to_string(path) {
26            Ok(content) => Some(content),
27            Err(e) => {
28                crate::warn!("could not read macro file {}: {}", path.display(), e);
29                None
30            }
31        })
32        .collect();
33    crate::parser::jinja::build_macro_prefix(&sources)
34}
35
36use super::types::*;
37
38/// Shared state threaded through the build_graph helper functions
39struct GraphBuilder {
40    graph: LineageGraph,
41    node_map: HashMap<String, NodeIndex>,
42}
43
44impl GraphBuilder {
45    fn new() -> Self {
46        Self {
47            graph: LineageGraph::new(),
48            node_map: HashMap::new(),
49        }
50    }
51
52    /// Add a node and register it in the node map
53    fn add_node(&mut self, data: NodeData) -> NodeIndex {
54        let idx = self.graph.add_node(data);
55        let unique_id = self.graph[idx].unique_id.clone();
56        self.node_map.insert(unique_id, idx);
57        idx
58    }
59
60    /// Get or create a phantom ref node, returning its index
61    fn get_or_create_phantom_ref(&mut self, ref_name: &str, sql_path: &Path) -> NodeIndex {
62        let dep_id = resolve_ref(ref_name, &self.node_map);
63        if let Some(&idx) = self.node_map.get(&dep_id) {
64            return idx;
65        }
66        crate::warn!("unresolved ref '{}' in {}", ref_name, sql_path.display());
67        let phantom_id = format!("model.{}", ref_name);
68        self.add_node(NodeData {
69            unique_id: phantom_id,
70            label: ref_name.to_string(),
71            node_type: NodeType::Phantom,
72            file_path: None,
73            description: None,
74            materialization: None,
75            tags: vec![],
76            columns: vec![],
77            exposure: None,
78        })
79    }
80
81    /// Get or create a phantom source node, returning its index
82    fn get_or_create_phantom_source(
83        &mut self,
84        source_name: &str,
85        table_name: &str,
86        sql_path: &Path,
87    ) -> NodeIndex {
88        let source_id = format!("source.{}.{}", source_name, table_name);
89        if let Some(&idx) = self.node_map.get(&source_id) {
90            return idx;
91        }
92        crate::warn!(
93            "unresolved source '{}.{}' in {}",
94            source_name,
95            table_name,
96            sql_path.display()
97        );
98        let label = format!("{}.{}", source_name, table_name);
99        self.add_node(NodeData {
100            unique_id: source_id,
101            label,
102            node_type: NodeType::Phantom,
103            file_path: None,
104            description: None,
105            materialization: None,
106            tags: vec![],
107            columns: vec![],
108            exposure: None,
109        })
110    }
111}
112
113/// Read a file with a descriptive error
114fn read_file(path: &Path) -> Result<String> {
115    std::fs::read_to_string(path).map_err(|e| {
116        crate::error::DbtLineageError::FileReadError {
117            path: path.to_path_buf(),
118            source: e,
119        }
120        .into()
121    })
122}
123
124/// Extract the file stem as a string, defaulting to "unknown"
125fn file_stem_str(path: &Path) -> String {
126    path.file_stem()
127        .and_then(|s| s.to_str())
128        .unwrap_or("unknown")
129        .to_string()
130}
131
132/// Create source nodes from a single schema file's source definitions
133fn add_source_nodes(
134    gb: &mut GraphBuilder,
135    schema: &crate::parser::yaml_schema::SchemaFile,
136    yaml_path: &Path,
137    project_dir: &Path,
138) {
139    let relative_path = yaml_path
140        .strip_prefix(project_dir)
141        .unwrap_or(yaml_path)
142        .to_path_buf();
143    for source_def in &schema.sources {
144        for table in &source_def.tables {
145            let unique_id = format!("source.{}.{}", source_def.name, table.name);
146            let label = format!("{}.{}", source_def.name, table.name);
147            gb.add_node(NodeData {
148                unique_id,
149                label,
150                node_type: NodeType::Source,
151                file_path: Some(relative_path.clone()),
152                description: table
153                    .description
154                    .clone()
155                    .or_else(|| source_def.description.clone()),
156                materialization: None,
157                tags: vec![],
158                columns: vec![],
159                exposure: None,
160            });
161        }
162    }
163}
164
165/// Metadata collected from YAML for a model
166#[derive(Clone, Default)]
167struct YamlModelMeta {
168    description: Option<String>,
169    materialization: Option<String>,
170    tags: Vec<String>,
171    columns: Vec<String>,
172}
173
174/// Result of parsing YAML schema files.
175/// The third element pairs each `SchemaFile` with the relative path of the
176/// YAML file it was parsed from (used to populate `file_path` on test nodes).
177type YamlResult = (
178    HashMap<String, YamlModelMeta>,
179    Vec<ExposureDefinition>,
180    Vec<(SchemaFile, PathBuf)>,
181);
182
183/// Parse YAML schema files: create source nodes, collect model metadata, exposures,
184/// and parsed schemas (for generic test extraction).
185fn process_yaml_files(
186    gb: &mut GraphBuilder,
187    files: &DiscoveredFiles,
188    project_dir: &Path,
189) -> Result<YamlResult> {
190    let mut model_meta: HashMap<String, YamlModelMeta> = HashMap::new();
191    let mut exposures: Vec<ExposureDefinition> = Vec::new();
192    let mut schemas: Vec<(SchemaFile, PathBuf)> = Vec::new();
193
194    // Sort YAML paths so that duplicate-test-ID suffixes (_2, _3, …) are
195    // deterministic across filesystems/OSes.
196    let mut sorted_yaml_files = files.yaml_files.clone();
197    sorted_yaml_files.sort();
198
199    for yaml_path in &sorted_yaml_files {
200        let content = read_file(yaml_path)?;
201        let schema = match parse_schema_file(&content, Some(yaml_path.as_path())) {
202            Ok(s) => s,
203            Err(_) => continue,
204        };
205
206        add_source_nodes(gb, &schema, yaml_path, project_dir);
207
208        for model_def in &schema.models {
209            let mut meta = YamlModelMeta {
210                description: model_def.description.clone(),
211                columns: model_def.columns.iter().map(|c| c.name.clone()).collect(),
212                ..Default::default()
213            };
214            // Merge tags from model-level and config-level
215            let mut tags = model_def.tags.clone();
216            if let Some(cfg) = &model_def.config {
217                meta.materialization = cfg.materialized.clone();
218                tags.extend(cfg.tags.clone());
219            }
220            tags.sort();
221            tags.dedup();
222            meta.tags = tags;
223            model_meta.insert(model_def.name.clone(), meta);
224        }
225
226        exposures.extend(schema.exposures.iter().cloned());
227        let relative_path = yaml_path
228            .strip_prefix(project_dir)
229            .unwrap_or(yaml_path)
230            .to_path_buf();
231        schemas.push((schema, relative_path));
232    }
233
234    Ok((model_meta, exposures, schemas))
235}
236
237/// Cached extraction result for a model SQL file (refs and sources).
238/// Avoids re-running minijinja in `process_sql_edges`.
239type ExtractionCache = HashMap<PathBuf, (Vec<RefCall>, Vec<SourceCall>)>;
240
241/// Result of parallel extraction for a single model SQL file
242struct ModelExtraction {
243    sql_path: PathBuf,
244    model_name: String,
245    extraction: Option<JinjaExtraction>,
246    columns: Vec<String>,
247    /// Whether this extraction came from the disk cache (no need to re-save)
248    from_cache: bool,
249}
250
251/// Create nodes for model SQL files (with duplicate detection).
252/// Returns an in-memory cache of refs/sources (for `process_sql_edges`)
253/// and updates the disk cache with newly extracted results.
254fn process_model_files(
255    gb: &mut GraphBuilder,
256    files: &DiscoveredFiles,
257    project_dir: &Path,
258    model_meta: &HashMap<String, YamlModelMeta>,
259    macro_prefix: &str,
260    disk_cache: &mut cache::ExtractionCache,
261    vars: &HashMap<String, serde_json::Value>,
262) -> ExtractionCache {
263    // Parallel phase: read files and run minijinja extraction concurrently.
264    // Uses disk cache (immutable borrow) to skip rendering for unchanged files.
265    let cache_ref = &*disk_cache;
266    let extractions: Vec<ModelExtraction> = files
267        .model_sql_files
268        .par_iter()
269        .map(|sql_path| {
270            let model_name = file_stem_str(sql_path);
271
272            // Check disk cache first
273            if let Some(cached) = cache_ref.get(sql_path, project_dir) {
274                let sql_content = std::fs::read_to_string(sql_path).ok();
275                let columns = sql_content
276                    .as_ref()
277                    .map(|content| extract_select_columns(content))
278                    .unwrap_or_default();
279                return ModelExtraction {
280                    sql_path: sql_path.clone(),
281                    model_name,
282                    extraction: Some(cached.clone()),
283                    columns,
284                    from_cache: true,
285                };
286            }
287
288            let sql_content = std::fs::read_to_string(sql_path).ok();
289
290            let extraction = sql_content
291                .as_ref()
292                .map(|content| extract_all_with_vars(content, macro_prefix, vars));
293
294            let columns = sql_content
295                .as_ref()
296                .map(|content| extract_select_columns(content))
297                .unwrap_or_default();
298
299            ModelExtraction {
300                sql_path: sql_path.clone(),
301                model_name,
302                extraction,
303                columns,
304                from_cache: false,
305            }
306        })
307        .collect();
308
309    // Sequential phase: insert nodes into the graph and update disk cache
310    let mut model_name_paths: HashMap<String, std::path::PathBuf> = HashMap::new();
311    let mut mem_cache: ExtractionCache = HashMap::new();
312
313    for me in extractions {
314        if let Some(existing_path) = model_name_paths.get(&me.model_name) {
315            crate::warn!(
316                "duplicate model name '{}' in {} and {}",
317                me.model_name,
318                existing_path.display(),
319                me.sql_path.display()
320            );
321        }
322        model_name_paths.insert(me.model_name.clone(), me.sql_path.clone());
323
324        let from_cache = me.from_cache;
325        let (sql_config, cached_refs_sources) = match me.extraction {
326            Some(ext) => {
327                // Save newly extracted results to disk cache
328                if !from_cache {
329                    disk_cache.insert(&me.sql_path, project_dir, &ext);
330                }
331                (ext.config, Some((ext.refs, ext.sources)))
332            }
333            None => (Default::default(), None),
334        };
335
336        if let Some(rs) = cached_refs_sources {
337            mem_cache.insert(me.sql_path.clone(), rs);
338        }
339
340        let yaml_meta = model_meta.get(&me.model_name);
341
342        let materialization = sql_config
343            .materialized
344            .or_else(|| yaml_meta.and_then(|m| m.materialization.clone()));
345
346        let mut tags = sql_config.tags;
347        if let Some(meta) = yaml_meta {
348            tags.extend(meta.tags.clone());
349        }
350        tags.sort();
351        tags.dedup();
352
353        let unique_id = format!("model.{}", me.model_name);
354        let relative_path = me
355            .sql_path
356            .strip_prefix(project_dir)
357            .unwrap_or(&me.sql_path)
358            .to_path_buf();
359
360        // Prefer YAML-defined columns; fall back to SQL extraction (best-effort)
361        let columns = match yaml_meta {
362            Some(m) if !m.columns.is_empty() => m.columns.clone(),
363            _ => me.columns,
364        };
365
366        gb.add_node(NodeData {
367            unique_id,
368            label: me.model_name,
369            node_type: NodeType::Model,
370            file_path: Some(relative_path),
371            description: yaml_meta.and_then(|m| m.description.clone()),
372            materialization,
373            tags,
374            columns,
375            exposure: None,
376        });
377    }
378
379    mem_cache
380}
381
382/// Create nodes for simple file-based resources (seeds, snapshots)
383fn process_simple_nodes(
384    gb: &mut GraphBuilder,
385    paths: &[std::path::PathBuf],
386    project_dir: &Path,
387    prefix: &str,
388    node_type: NodeType,
389) {
390    for path in paths {
391        let name = file_stem_str(path);
392        let unique_id = format!("{}.{}", prefix, name);
393        let relative_path = path.strip_prefix(project_dir).unwrap_or(path).to_path_buf();
394
395        gb.add_node(NodeData {
396            unique_id,
397            label: name,
398            node_type,
399            file_path: Some(relative_path),
400            description: None,
401            materialization: None,
402            tags: vec![],
403            columns: vec![],
404            exposure: None,
405        });
406    }
407}
408
409/// Parse SQL files for ref()/source() calls and add edges.
410/// `extraction_cache` contains pre-extracted refs/sources for model files
411/// (from `process_model_files`) to avoid redundant minijinja renders.
412fn process_sql_edges(
413    gb: &mut GraphBuilder,
414    files: &DiscoveredFiles,
415    project_dir: &Path,
416    macro_prefix: &str,
417    extraction_cache: &ExtractionCache,
418    vars: &HashMap<String, serde_json::Value>,
419) -> Result<()> {
420    let all_sql_files: Vec<(&std::path::PathBuf, &str)> = files
421        .model_sql_files
422        .iter()
423        .map(|p| (p, "model"))
424        .chain(files.snapshot_sql_files.iter().map(|p| (p, "snapshot")))
425        .chain(files.test_sql_files.iter().map(|p| (p, "test")))
426        .collect();
427
428    for (sql_path, file_type) in &all_sql_files {
429        let node_name = file_stem_str(sql_path);
430        let node_unique_id = format!("{}.{}", file_type, node_name);
431
432        // Create test nodes on the fly
433        if *file_type == "test" {
434            let relative_path = sql_path
435                .strip_prefix(project_dir)
436                .unwrap_or(sql_path)
437                .to_path_buf();
438            gb.add_node(NodeData {
439                unique_id: node_unique_id.clone(),
440                label: node_name,
441                node_type: NodeType::Test,
442                file_path: Some(relative_path),
443                description: None,
444                materialization: None,
445                tags: vec![],
446                columns: vec![],
447                exposure: None,
448            });
449        }
450
451        let current_idx = match gb.node_map.get(&node_unique_id) {
452            Some(&idx) => idx,
453            None => continue,
454        };
455
456        // Use cached extraction for model files; extract fresh for others
457        let owned;
458        let (refs, sources) = if let Some(cached) = extraction_cache.get(*sql_path) {
459            (&cached.0, &cached.1)
460        } else {
461            let content = read_file(sql_path)?;
462            owned = extract_refs_and_sources_with_vars(&content, macro_prefix, vars);
463            (&owned.0, &owned.1)
464        };
465
466        // Use EdgeType::Test when the target node is a test, so all test
467        // relationships render with consistent edge labels/styles.
468        let is_test = *file_type == "test";
469
470        for ref_call in refs {
471            let dep_idx = gb.get_or_create_phantom_ref(&ref_call.name, sql_path);
472            let edge_type = if is_test {
473                EdgeType::Test
474            } else {
475                EdgeType::Ref
476            };
477            gb.graph
478                .add_edge(dep_idx, current_idx, EdgeData::direct(edge_type));
479        }
480
481        for source_call in sources {
482            let source_idx = gb.get_or_create_phantom_source(
483                &source_call.source_name,
484                &source_call.table_name,
485                sql_path,
486            );
487            let edge_type = if is_test {
488                EdgeType::Test
489            } else {
490                EdgeType::Source
491            };
492            gb.graph
493                .add_edge(source_idx, current_idx, EdgeData::direct(edge_type));
494        }
495    }
496
497    Ok(())
498}
499
500/// Create exposure nodes and edges to their dependencies
501fn process_exposures(gb: &mut GraphBuilder, exposures: &[ExposureDefinition]) {
502    for exposure in exposures {
503        let unique_id = format!("exposure.{}", exposure.name);
504        let idx = gb.add_node(NodeData {
505            unique_id,
506            label: exposure.name.clone(),
507            node_type: NodeType::Exposure,
508            file_path: None,
509            description: exposure.description.clone(),
510            materialization: None,
511            tags: vec![],
512            columns: vec![],
513            exposure: Some(ExposureInfo {
514                label: exposure.label.clone(),
515                exposure_type: exposure.exposure_type.clone(),
516                url: exposure.url.clone(),
517                maturity: exposure.maturity.clone(),
518                owner: exposure.owner.as_ref().map(|o| OwnerInfo {
519                    name: o.name.as_ref().filter(|s| !s.trim().is_empty()).cloned(),
520                    email: o.email.as_ref().filter(|s| !s.trim().is_empty()).cloned(),
521                }),
522            }),
523        });
524
525        for dep in &exposure.depends_on {
526            if let Some(model_name) = parse_exposure_ref(dep) {
527                let dep_id = resolve_ref(&model_name, &gb.node_map);
528                if let Some(&dep_idx) = gb.node_map.get(&dep_id) {
529                    gb.graph
530                        .add_edge(dep_idx, idx, EdgeData::direct(EdgeType::Exposure));
531                }
532            }
533        }
534    }
535}
536
537/// Deduplicate a candidate unique_id by appending `_2`, `_3`, … if it already
538/// exists in the node map.  Returns `(unique_id, suffix)` where `suffix` is
539/// `None` when no deduplication was needed, or `Some("_2")` etc. when it was.
540/// Callers can append the suffix to labels so they stay distinct too.
541fn dedup_unique_id(
542    candidate: &str,
543    node_map: &HashMap<String, NodeIndex>,
544) -> (String, Option<String>) {
545    if !node_map.contains_key(candidate) {
546        return (candidate.to_string(), None);
547    }
548    let mut n = 2u32;
549    loop {
550        let suffix = format!("_{}", n);
551        let suffixed = format!("{}{}", candidate, suffix);
552        if !node_map.contains_key(&suffixed) {
553            return (suffixed, Some(suffix));
554        }
555        n += 1;
556    }
557}
558
559/// Add a generic test node to the graph and connect it to the parent.
560fn add_generic_test_node(
561    gb: &mut GraphBuilder,
562    parent_idx: NodeIndex,
563    unique_id: String,
564    label: String,
565    file_path: Option<PathBuf>,
566) {
567    let idx = gb.add_node(NodeData {
568        unique_id,
569        label,
570        node_type: NodeType::Test,
571        file_path,
572        description: None,
573        materialization: None,
574        tags: vec![],
575        columns: vec![],
576        exposure: None,
577    });
578    gb.graph
579        .add_edge(parent_idx, idx, EdgeData::direct(EdgeType::Test));
580}
581
582/// Create test nodes for YAML-declared generic tests (not_null, unique, etc.)
583/// and connect them to their parent model/source nodes.
584fn process_generic_tests(gb: &mut GraphBuilder, schemas: &[(SchemaFile, PathBuf)]) {
585    for (schema, yaml_path) in schemas {
586        let file_path = Some(yaml_path.clone());
587
588        // Model-level generic tests
589        for model_def in &schema.models {
590            let parent_id = format!("model.{}", model_def.name);
591            let parent_idx = match gb.node_map.get(&parent_id) {
592                Some(&idx) => idx,
593                None => continue,
594            };
595
596            // Model-level tests (not attached to a column)
597            for test_def in &model_def.tests {
598                let test_name = match test_def.test_name() {
599                    Some(name) => name,
600                    None => continue,
601                };
602                let candidate = format!("test.{}.{}", test_name, model_def.name);
603                let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
604                let mut label = format!("{}_{}", test_name, model_def.name);
605                if let Some(s) = suffix {
606                    label.push_str(&s);
607                }
608                add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
609            }
610
611            // Column-level tests
612            for col in &model_def.columns {
613                for test_def in &col.tests {
614                    let test_name = match test_def.test_name() {
615                        Some(name) => name,
616                        None => continue,
617                    };
618                    let candidate = format!("test.{}.{}.{}", test_name, model_def.name, col.name);
619                    let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
620                    let mut label = format!("{}_{}_{}", test_name, model_def.name, col.name);
621                    if let Some(s) = suffix {
622                        label.push_str(&s);
623                    }
624                    add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
625                }
626            }
627        }
628
629        // Source-level generic tests (column-level only)
630        for source_def in &schema.sources {
631            for table in &source_def.tables {
632                let parent_id = format!("source.{}.{}", source_def.name, table.name);
633                let parent_idx = match gb.node_map.get(&parent_id) {
634                    Some(&idx) => idx,
635                    None => continue,
636                };
637                for col in &table.columns {
638                    for test_def in &col.tests {
639                        let test_name = match test_def.test_name() {
640                            Some(name) => name,
641                            None => continue,
642                        };
643                        let candidate = format!(
644                            "test.{}.{}.{}.{}",
645                            test_name, source_def.name, table.name, col.name
646                        );
647                        let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
648                        let mut label = format!(
649                            "{}_{}_{}_{}",
650                            test_name, source_def.name, table.name, col.name
651                        );
652                        if let Some(s) = suffix {
653                            label.push_str(&s);
654                        }
655                        add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
656                    }
657                }
658            }
659        }
660    }
661}
662
663/// Build the lineage graph from discovered files.
664/// If `cache_dir` is provided, it is used as the cache directory;
665/// otherwise the cache is stored under `<project_dir>/.dlin_cache/`.
666/// If `no_cache` is true, the extraction cache is completely disabled.
667/// If `refresh_cache` is true, the existing cache is ignored but new results
668/// are written to disk.
669pub fn build_graph(
670    project_dir: &Path,
671    files: &DiscoveredFiles,
672    cache_dir: Option<&Path>,
673    no_cache: bool,
674    refresh_cache: bool,
675    vars: &HashMap<String, serde_json::Value>,
676) -> Result<LineageGraph> {
677    let mut gb = GraphBuilder::new();
678    let macro_prefix = load_macro_prefix(files);
679    let mut disk_cache = if no_cache {
680        cache::ExtractionCache::disabled()
681    } else if refresh_cache {
682        cache::ExtractionCache::fresh(project_dir, &macro_prefix, vars, cache_dir)
683    } else {
684        cache::ExtractionCache::load(project_dir, &macro_prefix, vars, cache_dir)
685    };
686
687    let (model_meta, exposures, schemas) = process_yaml_files(&mut gb, files, project_dir)?;
688    let extraction_cache = process_model_files(
689        &mut gb,
690        files,
691        project_dir,
692        &model_meta,
693        &macro_prefix,
694        &mut disk_cache,
695        vars,
696    );
697    process_simple_nodes(
698        &mut gb,
699        &files.seed_files,
700        project_dir,
701        "seed",
702        NodeType::Seed,
703    );
704    process_simple_nodes(
705        &mut gb,
706        &files.snapshot_sql_files,
707        project_dir,
708        "snapshot",
709        NodeType::Snapshot,
710    );
711    process_sql_edges(
712        &mut gb,
713        files,
714        project_dir,
715        &macro_prefix,
716        &extraction_cache,
717        vars,
718    )?;
719    process_exposures(&mut gb, &exposures);
720    process_generic_tests(&mut gb, &schemas);
721
722    disk_cache.save();
723
724    Ok(gb.graph)
725}
726
727/// Try to resolve a ref name to a node unique_id
728fn resolve_ref(name: &str, node_map: &HashMap<String, NodeIndex>) -> String {
729    // Try model first, then seed, then snapshot
730    let model_id = format!("model.{}", name);
731    if node_map.contains_key(&model_id) {
732        return model_id;
733    }
734
735    let seed_id = format!("seed.{}", name);
736    if node_map.contains_key(&seed_id) {
737        return seed_id;
738    }
739
740    let snapshot_id = format!("snapshot.{}", name);
741    if node_map.contains_key(&snapshot_id) {
742        return snapshot_id;
743    }
744
745    // Default to model
746    model_id
747}
748
749/// Parse a ref('name') or source('src', 'table') string from exposure depends_on
750fn parse_exposure_ref(dep: &str) -> Option<String> {
751    let dep = dep.trim();
752    if dep.starts_with("ref(") {
753        // Extract name from ref('name')
754        let inner = dep.trim_start_matches("ref(").trim_end_matches(')');
755        let name = inner.trim().trim_matches('\'').trim_matches('"');
756        Some(name.to_string())
757    } else if dep.starts_with("source(") {
758        // For sources in exposures, we won't create edges here for simplicity
759        None
760    } else {
761        None
762    }
763}
764
765#[cfg(test)]
766mod tests {
767    use super::*;
768    use crate::parser::discovery::DiscoveredFiles;
769    use std::fs;
770    use std::path::PathBuf;
771
772    #[test]
773    fn test_resolve_ref_model() {
774        let mut node_map = HashMap::new();
775        let graph = &mut LineageGraph::new();
776        let idx = graph.add_node(NodeData {
777            unique_id: "model.orders".to_string(),
778            label: "orders".to_string(),
779            node_type: NodeType::Model,
780            file_path: None,
781            description: None,
782            materialization: None,
783            tags: vec![],
784            columns: vec![],
785            exposure: None,
786        });
787        node_map.insert("model.orders".to_string(), idx);
788
789        assert_eq!(resolve_ref("orders", &node_map), "model.orders");
790    }
791
792    #[test]
793    fn test_resolve_ref_seed() {
794        let mut node_map = HashMap::new();
795        let graph = &mut LineageGraph::new();
796        let idx = graph.add_node(NodeData {
797            unique_id: "seed.countries".to_string(),
798            label: "countries".to_string(),
799            node_type: NodeType::Seed,
800            file_path: None,
801            description: None,
802            materialization: None,
803            tags: vec![],
804            columns: vec![],
805            exposure: None,
806        });
807        node_map.insert("seed.countries".to_string(), idx);
808
809        assert_eq!(resolve_ref("countries", &node_map), "seed.countries");
810    }
811
812    #[test]
813    fn test_resolve_ref_snapshot() {
814        let mut node_map = HashMap::new();
815        let graph = &mut LineageGraph::new();
816        let idx = graph.add_node(NodeData {
817            unique_id: "snapshot.snap_orders".to_string(),
818            label: "snap_orders".to_string(),
819            node_type: NodeType::Snapshot,
820            file_path: None,
821            description: None,
822            materialization: None,
823            tags: vec![],
824            columns: vec![],
825            exposure: None,
826        });
827        node_map.insert("snapshot.snap_orders".to_string(), idx);
828
829        assert_eq!(
830            resolve_ref("snap_orders", &node_map),
831            "snapshot.snap_orders"
832        );
833    }
834
835    #[test]
836    fn test_resolve_ref_unknown_defaults_to_model() {
837        let node_map = HashMap::new();
838        assert_eq!(resolve_ref("unknown_ref", &node_map), "model.unknown_ref");
839    }
840
841    #[test]
842    fn test_parse_exposure_ref() {
843        assert_eq!(
844            parse_exposure_ref("ref('orders')"),
845            Some("orders".to_string())
846        );
847        assert_eq!(
848            parse_exposure_ref("ref(\"orders\")"),
849            Some("orders".to_string())
850        );
851        assert_eq!(parse_exposure_ref("source('raw', 'orders')"), None);
852        assert_eq!(parse_exposure_ref("something_else"), None);
853    }
854
855    /// Helper to create a temporary dbt project for build_graph tests
856    fn setup_temp_project() -> (tempfile::TempDir, PathBuf) {
857        let tmp = tempfile::tempdir().unwrap();
858        let project_dir = tmp.path().to_path_buf();
859
860        // Create model files
861        let models_dir = project_dir.join("models");
862        fs::create_dir_all(&models_dir).unwrap();
863
864        fs::write(
865            models_dir.join("stg_orders.sql"),
866            "SELECT * FROM {{ source('raw', 'orders') }}",
867        )
868        .unwrap();
869
870        fs::write(
871            models_dir.join("orders.sql"),
872            "SELECT * FROM {{ ref('stg_orders') }}",
873        )
874        .unwrap();
875
876        // Create schema YAML with source definition
877        fs::write(
878            models_dir.join("schema.yml"),
879            r#"
880version: 2
881sources:
882  - name: raw
883    tables:
884      - name: orders
885        description: "Raw orders table"
886models:
887  - name: stg_orders
888    description: "Staged orders"
889"#,
890        )
891        .unwrap();
892
893        (tmp, project_dir)
894    }
895
896    #[test]
897    fn test_build_graph_sources_and_models() {
898        let (_tmp, project_dir) = setup_temp_project();
899
900        let files = DiscoveredFiles {
901            model_sql_files: vec![
902                project_dir.join("models/stg_orders.sql"),
903                project_dir.join("models/orders.sql"),
904            ],
905            yaml_files: vec![project_dir.join("models/schema.yml")],
906            ..Default::default()
907        };
908
909        // Use no_cache: false to exercise the cache-enabled path end-to-end
910        let graph = build_graph(&project_dir, &files, None, false, false, &HashMap::new()).unwrap();
911
912        // Should have source + 2 models = 3 nodes
913        assert_eq!(graph.node_count(), 3);
914
915        // Check node types
916        let mut types: Vec<NodeType> = graph.node_indices().map(|i| graph[i].node_type).collect();
917        types.sort_by_key(|t| format!("{:?}", t));
918        assert!(types.contains(&NodeType::Source));
919        assert!(types.iter().filter(|t| **t == NodeType::Model).count() == 2);
920
921        // Should have 2 edges: source→stg_orders, stg_orders→orders
922        assert_eq!(graph.edge_count(), 2);
923    }
924
925    #[test]
926    fn test_build_graph_with_seeds() {
927        let (_tmp, project_dir) = setup_temp_project();
928
929        // Add a seed
930        let seeds_dir = project_dir.join("seeds");
931        fs::create_dir_all(&seeds_dir).unwrap();
932        fs::write(seeds_dir.join("countries.csv"), "id,name\n1,US\n").unwrap();
933
934        let files = DiscoveredFiles {
935            seed_files: vec![project_dir.join("seeds/countries.csv")],
936            ..Default::default()
937        };
938
939        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
940        assert_eq!(graph.node_count(), 1);
941        let node = &graph[graph.node_indices().next().unwrap()];
942        assert_eq!(node.node_type, NodeType::Seed);
943        assert_eq!(node.label, "countries");
944    }
945
946    #[test]
947    fn test_build_graph_with_snapshots() {
948        let (_tmp, project_dir) = setup_temp_project();
949
950        let snap_dir = project_dir.join("snapshots");
951        fs::create_dir_all(&snap_dir).unwrap();
952        fs::write(snap_dir.join("snap_orders.sql"), "SELECT 1").unwrap();
953
954        let files = DiscoveredFiles {
955            snapshot_sql_files: vec![project_dir.join("snapshots/snap_orders.sql")],
956            ..Default::default()
957        };
958
959        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
960        assert_eq!(graph.node_count(), 1);
961        let node = &graph[graph.node_indices().next().unwrap()];
962        assert_eq!(node.node_type, NodeType::Snapshot);
963        assert_eq!(node.label, "snap_orders");
964    }
965
966    #[test]
967    fn test_build_graph_with_tests() {
968        let (_tmp, project_dir) = setup_temp_project();
969
970        let test_dir = project_dir.join("tests");
971        fs::create_dir_all(&test_dir).unwrap();
972        fs::write(
973            test_dir.join("assert_positive.sql"),
974            "SELECT * FROM {{ ref('stg_orders') }} WHERE amount < 0",
975        )
976        .unwrap();
977
978        // Need the model that the test references
979        let models_dir = project_dir.join("models");
980        fs::create_dir_all(&models_dir).unwrap();
981        fs::write(models_dir.join("stg_orders.sql"), "SELECT 1").unwrap();
982
983        let files = DiscoveredFiles {
984            model_sql_files: vec![project_dir.join("models/stg_orders.sql")],
985            test_sql_files: vec![project_dir.join("tests/assert_positive.sql")],
986            ..Default::default()
987        };
988
989        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
990        // model + test = 2 nodes
991        assert_eq!(graph.node_count(), 2);
992        // test edge: stg_orders → assert_positive
993        assert_eq!(graph.edge_count(), 1);
994
995        // Singular SQL tests should use EdgeType::Test
996        use petgraph::visit::IntoEdgeReferences;
997        let edge = graph.edge_references().next().unwrap();
998        assert_eq!(edge.weight().edge_type, EdgeType::Test);
999    }
1000
1001    #[test]
1002    fn test_build_graph_with_exposures() {
1003        let (_tmp, project_dir) = setup_temp_project();
1004
1005        let models_dir = project_dir.join("models");
1006        fs::create_dir_all(&models_dir).unwrap();
1007        fs::write(models_dir.join("orders.sql"), "SELECT 1").unwrap();
1008
1009        fs::write(
1010            models_dir.join("schema.yml"),
1011            r#"
1012version: 2
1013sources: []
1014models: []
1015exposures:
1016  - name: weekly_report
1017    description: "Weekly report dashboard"
1018    depends_on:
1019      - ref('orders')
1020"#,
1021        )
1022        .unwrap();
1023
1024        let files = DiscoveredFiles {
1025            model_sql_files: vec![project_dir.join("models/orders.sql")],
1026            yaml_files: vec![project_dir.join("models/schema.yml")],
1027            ..Default::default()
1028        };
1029
1030        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1031        // model + exposure = 2 nodes
1032        assert_eq!(graph.node_count(), 2);
1033        // exposure edge: orders → weekly_report
1034        assert_eq!(graph.edge_count(), 1);
1035    }
1036
1037    #[test]
1038    fn test_build_graph_ref_resolves_to_seed() {
1039        let tmp = tempfile::tempdir().unwrap();
1040        let project_dir = tmp.path().to_path_buf();
1041
1042        let models_dir = project_dir.join("models");
1043        let seeds_dir = project_dir.join("seeds");
1044        fs::create_dir_all(&models_dir).unwrap();
1045        fs::create_dir_all(&seeds_dir).unwrap();
1046
1047        fs::write(seeds_dir.join("countries.csv"), "id,name\n1,US\n").unwrap();
1048        fs::write(
1049            models_dir.join("stg_countries.sql"),
1050            "SELECT * FROM {{ ref('countries') }}",
1051        )
1052        .unwrap();
1053
1054        let files = DiscoveredFiles {
1055            model_sql_files: vec![project_dir.join("models/stg_countries.sql")],
1056            seed_files: vec![project_dir.join("seeds/countries.csv")],
1057            ..Default::default()
1058        };
1059
1060        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1061        // seed + model = 2 nodes (no phantom)
1062        assert_eq!(graph.node_count(), 2);
1063        // ref edge: countries → stg_countries
1064        assert_eq!(graph.edge_count(), 1);
1065
1066        // Verify the seed node is properly typed (not phantom)
1067        let seed_node = graph
1068            .node_indices()
1069            .find(|&i| graph[i].label == "countries")
1070            .unwrap();
1071        assert_eq!(graph[seed_node].node_type, NodeType::Seed);
1072    }
1073
1074    #[test]
1075    fn test_build_graph_phantom_node_for_unresolved_ref() {
1076        let (_tmp, project_dir) = setup_temp_project();
1077
1078        let models_dir = project_dir.join("models");
1079        fs::create_dir_all(&models_dir).unwrap();
1080        fs::write(
1081            models_dir.join("orders.sql"),
1082            "SELECT * FROM {{ ref('nonexistent_model') }}",
1083        )
1084        .unwrap();
1085
1086        let files = DiscoveredFiles {
1087            model_sql_files: vec![project_dir.join("models/orders.sql")],
1088            ..Default::default()
1089        };
1090
1091        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1092        // model + phantom = 2 nodes
1093        assert_eq!(graph.node_count(), 2);
1094        let phantom = graph
1095            .node_indices()
1096            .find(|&i| graph[i].node_type == NodeType::Phantom)
1097            .expect("Should have a phantom node");
1098        assert_eq!(graph[phantom].label, "nonexistent_model");
1099    }
1100
1101    #[test]
1102    fn test_build_graph_phantom_node_for_unresolved_source() {
1103        let (_tmp, project_dir) = setup_temp_project();
1104
1105        let models_dir = project_dir.join("models");
1106        fs::create_dir_all(&models_dir).unwrap();
1107        fs::write(
1108            models_dir.join("orders.sql"),
1109            "SELECT * FROM {{ source('unknown_src', 'unknown_table') }}",
1110        )
1111        .unwrap();
1112
1113        let files = DiscoveredFiles {
1114            model_sql_files: vec![project_dir.join("models/orders.sql")],
1115            ..Default::default()
1116        };
1117
1118        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1119        // model + phantom source = 2 nodes
1120        assert_eq!(graph.node_count(), 2);
1121        let phantom = graph
1122            .node_indices()
1123            .find(|&i| graph[i].node_type == NodeType::Phantom)
1124            .expect("Should have a phantom source node");
1125        assert_eq!(graph[phantom].label, "unknown_src.unknown_table");
1126    }
1127
1128    #[test]
1129    fn test_build_graph_model_descriptions() {
1130        let (_tmp, project_dir) = setup_temp_project();
1131
1132        let files = DiscoveredFiles {
1133            model_sql_files: vec![project_dir.join("models/stg_orders.sql")],
1134            yaml_files: vec![project_dir.join("models/schema.yml")],
1135            ..Default::default()
1136        };
1137
1138        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1139        let stg = graph
1140            .node_indices()
1141            .find(|&i| graph[i].label == "stg_orders")
1142            .unwrap();
1143        assert_eq!(graph[stg].description.as_deref(), Some("Staged orders"));
1144    }
1145
1146    #[test]
1147    fn test_build_graph_edge_types() {
1148        use petgraph::visit::IntoEdgeReferences;
1149
1150        let (_tmp, project_dir) = setup_temp_project();
1151
1152        let files = DiscoveredFiles {
1153            model_sql_files: vec![
1154                project_dir.join("models/stg_orders.sql"),
1155                project_dir.join("models/orders.sql"),
1156            ],
1157            yaml_files: vec![project_dir.join("models/schema.yml")],
1158            ..Default::default()
1159        };
1160
1161        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1162        let edge_types: Vec<EdgeType> = graph
1163            .edge_references()
1164            .map(|e| e.weight().edge_type)
1165            .collect();
1166        assert!(edge_types.contains(&EdgeType::Source));
1167        assert!(edge_types.contains(&EdgeType::Ref));
1168    }
1169
1170    #[test]
1171    fn test_build_graph_empty_files() {
1172        let tmp = tempfile::tempdir().unwrap();
1173        let files = DiscoveredFiles::default();
1174        let graph = build_graph(tmp.path(), &files, None, true, false, &HashMap::new()).unwrap();
1175        assert_eq!(graph.node_count(), 0);
1176        assert_eq!(graph.edge_count(), 0);
1177    }
1178
1179    #[test]
1180    fn test_build_graph_model_config_merge() {
1181        // Covers lines 168-170: YAML model config with materialization and tags
1182        let tmp = tempfile::tempdir().unwrap();
1183        let project_dir = tmp.path().to_path_buf();
1184
1185        let models_dir = project_dir.join("models");
1186        fs::create_dir_all(&models_dir).unwrap();
1187
1188        fs::write(models_dir.join("stg_orders.sql"), "SELECT 1").unwrap();
1189
1190        fs::write(
1191            models_dir.join("schema.yml"),
1192            r#"
1193version: 2
1194sources: []
1195models:
1196  - name: stg_orders
1197    description: "Staged orders"
1198    tags:
1199      - staging
1200    config:
1201      materialized: table
1202      tags:
1203        - daily
1204"#,
1205        )
1206        .unwrap();
1207
1208        let files = DiscoveredFiles {
1209            model_sql_files: vec![project_dir.join("models/stg_orders.sql")],
1210            yaml_files: vec![project_dir.join("models/schema.yml")],
1211            ..Default::default()
1212        };
1213
1214        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1215        let stg = graph
1216            .node_indices()
1217            .find(|&i| graph[i].label == "stg_orders")
1218            .unwrap();
1219        assert_eq!(graph[stg].materialization.as_deref(), Some("table"));
1220        assert!(graph[stg].tags.contains(&"staging".to_string()));
1221        assert!(graph[stg].tags.contains(&"daily".to_string()));
1222    }
1223
1224    #[test]
1225    fn test_build_graph_duplicate_model_name() {
1226        // Covers line 197: duplicate model name warning
1227        let tmp = tempfile::tempdir().unwrap();
1228        let project_dir = tmp.path().to_path_buf();
1229
1230        let models_dir = project_dir.join("models");
1231        let subdir = models_dir.join("subdir");
1232        fs::create_dir_all(&subdir).unwrap();
1233
1234        fs::write(models_dir.join("orders.sql"), "SELECT 1").unwrap();
1235        fs::write(subdir.join("orders.sql"), "SELECT 2").unwrap();
1236
1237        let files = DiscoveredFiles {
1238            model_sql_files: vec![
1239                project_dir.join("models/orders.sql"),
1240                project_dir.join("models/subdir/orders.sql"),
1241            ],
1242            ..Default::default()
1243        };
1244
1245        // Should not panic, just warn on stderr about the duplicate
1246        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1247        // Both SQL files produce nodes (duplicate warning is informational)
1248        let order_nodes: Vec<_> = graph
1249            .node_indices()
1250            .filter(|&i| graph[i].label == "orders")
1251            .collect();
1252        assert_eq!(order_nodes.len(), 2);
1253    }
1254
1255    #[test]
1256    fn test_build_graph_file_paths_are_relative() {
1257        let (_tmp, project_dir) = setup_temp_project();
1258
1259        let files = DiscoveredFiles {
1260            model_sql_files: vec![
1261                project_dir.join("models/stg_orders.sql"),
1262                project_dir.join("models/orders.sql"),
1263            ],
1264            yaml_files: vec![project_dir.join("models/schema.yml")],
1265            ..Default::default()
1266        };
1267
1268        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1269
1270        for idx in graph.node_indices() {
1271            let node = &graph[idx];
1272            if let Some(ref fp) = node.file_path {
1273                assert!(
1274                    fp.is_relative(),
1275                    "file_path for node '{}' should be relative but got: {}",
1276                    node.label,
1277                    fp.display()
1278                );
1279                assert!(
1280                    !fp.starts_with(&project_dir),
1281                    "file_path for node '{}' should not start with project_dir: {}",
1282                    node.label,
1283                    fp.display()
1284                );
1285            }
1286        }
1287
1288        // Verify source node specifically has relative path
1289        let source_node = graph
1290            .node_indices()
1291            .find(|&i| graph[i].node_type == NodeType::Source)
1292            .expect("should have a source node");
1293        assert_eq!(
1294            graph[source_node].file_path.as_deref(),
1295            Some(std::path::Path::new("models/schema.yml"))
1296        );
1297
1298        // Verify model node has relative path
1299        let model_node = graph
1300            .node_indices()
1301            .find(|&i| graph[i].label == "stg_orders")
1302            .unwrap();
1303        assert_eq!(
1304            graph[model_node].file_path.as_deref(),
1305            Some(std::path::Path::new("models/stg_orders.sql"))
1306        );
1307    }
1308
1309    #[test]
1310    fn test_build_graph_with_macros() {
1311        let tmp = tempfile::tempdir().unwrap();
1312        let project_dir = tmp.path().to_path_buf();
1313
1314        let models_dir = project_dir.join("models");
1315        let macros_dir = project_dir.join("macros");
1316        fs::create_dir_all(&models_dir).unwrap();
1317        fs::create_dir_all(&macros_dir).unwrap();
1318
1319        // Macro that references a model
1320        fs::write(
1321            macros_dir.join("my_macro.sql"),
1322            r#"
1323{% macro my_cte() %}
1324    SELECT * FROM {{ ref('base_table') }}
1325{% endmacro %}
1326"#,
1327        )
1328        .unwrap();
1329
1330        // Model that uses the macro
1331        fs::write(models_dir.join("base_table.sql"), "SELECT 1 as id").unwrap();
1332        fs::write(
1333            models_dir.join("derived.sql"),
1334            "SELECT * FROM ({{ my_cte() }})",
1335        )
1336        .unwrap();
1337
1338        let files = DiscoveredFiles {
1339            model_sql_files: vec![
1340                project_dir.join("models/base_table.sql"),
1341                project_dir.join("models/derived.sql"),
1342            ],
1343            macro_sql_files: vec![project_dir.join("macros/my_macro.sql")],
1344            ..Default::default()
1345        };
1346
1347        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1348        // base_table + derived = 2 nodes
1349        assert_eq!(graph.node_count(), 2);
1350        // ref edge: base_table → derived
1351        assert_eq!(graph.edge_count(), 1);
1352
1353        // Verify the edge direction
1354        let base = graph
1355            .node_indices()
1356            .find(|&i| graph[i].label == "base_table")
1357            .unwrap();
1358        let derived = graph
1359            .node_indices()
1360            .find(|&i| graph[i].label == "derived")
1361            .unwrap();
1362        assert!(graph.contains_edge(base, derived));
1363    }
1364
1365    #[test]
1366    fn test_var_list_expansion_resolves_refs() {
1367        let tmp = tempfile::tempdir().unwrap();
1368        let project_dir = tmp.path().to_path_buf();
1369        let models_dir = project_dir.join("models");
1370        fs::create_dir_all(&models_dir).unwrap();
1371
1372        // dbt_project.yml with vars
1373        fs::write(project_dir.join("dbt_project.yml"), "name: var_test\n").unwrap();
1374
1375        // Model that uses var() to iterate over categories and ref dynamically
1376        fs::write(
1377            models_dir.join("combined.sql"),
1378            r#"
1379            {%- set categories = var("product_categories") -%}
1380            {%- for cat in categories -%}
1381                SELECT * FROM {{ ref('stg_' ~ cat ~ '_summary') }}
1382                {% if not loop.last %}UNION ALL{% endif %}
1383            {%- endfor -%}
1384            "#,
1385        )
1386        .unwrap();
1387
1388        // Stub models that the refs point to
1389        fs::write(models_dir.join("stg_electronics_summary.sql"), "SELECT 1").unwrap();
1390        fs::write(models_dir.join("stg_clothing_summary.sql"), "SELECT 1").unwrap();
1391
1392        let files = DiscoveredFiles {
1393            model_sql_files: vec![
1394                project_dir.join("models/combined.sql"),
1395                project_dir.join("models/stg_electronics_summary.sql"),
1396                project_dir.join("models/stg_clothing_summary.sql"),
1397            ],
1398            ..Default::default()
1399        };
1400
1401        // Provide project-level vars
1402        let mut vars = HashMap::new();
1403        vars.insert(
1404            "product_categories".to_string(),
1405            serde_json::json!(["electronics", "clothing"]),
1406        );
1407
1408        let graph = build_graph(&project_dir, &files, None, true, false, &vars).unwrap();
1409
1410        // 3 model nodes: combined + stg_electronics_summary + stg_clothing_summary
1411        assert_eq!(graph.node_count(), 3);
1412
1413        // 2 edges: stg_electronics_summary → combined, stg_clothing_summary → combined
1414        assert_eq!(graph.edge_count(), 2);
1415
1416        let combined = graph
1417            .node_indices()
1418            .find(|&i| graph[i].label == "combined")
1419            .unwrap();
1420        let electronics = graph
1421            .node_indices()
1422            .find(|&i| graph[i].label == "stg_electronics_summary")
1423            .unwrap();
1424        let clothing = graph
1425            .node_indices()
1426            .find(|&i| graph[i].label == "stg_clothing_summary")
1427            .unwrap();
1428        assert!(graph.contains_edge(electronics, combined));
1429        assert!(graph.contains_edge(clothing, combined));
1430    }
1431
1432    #[test]
1433    fn test_generic_tests_from_yaml() {
1434        let tmp = tempfile::tempdir().unwrap();
1435        let project_dir = tmp.path().to_path_buf();
1436        let models_dir = project_dir.join("models");
1437        fs::create_dir_all(&models_dir).unwrap();
1438
1439        fs::write(project_dir.join("dbt_project.yml"), "name: test_proj\n").unwrap();
1440        fs::write(models_dir.join("orders.sql"), "SELECT 1 AS order_id").unwrap();
1441
1442        // Schema with generic tests on columns
1443        fs::write(
1444            models_dir.join("schema.yml"),
1445            r#"
1446sources:
1447  - name: raw
1448    tables:
1449      - name: events
1450        columns:
1451          - name: event_id
1452            data_tests:
1453              - not_null
1454models:
1455  - name: orders
1456    data_tests:
1457      - dbt_utils.expression_is_true:
1458          expression: "a = b"
1459      - dbt_utils.expression_is_true:
1460          expression: "c = d"
1461    columns:
1462      - name: order_id
1463        data_tests:
1464          - not_null
1465          - unique
1466"#,
1467        )
1468        .unwrap();
1469
1470        let files = DiscoveredFiles {
1471            model_sql_files: vec![project_dir.join("models/orders.sql")],
1472            yaml_files: vec![project_dir.join("models/schema.yml")],
1473            ..Default::default()
1474        };
1475
1476        let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1477
1478        // 1 model + 1 source + 2 column tests + 1 source test + 2 model-level tests = 7
1479        assert_eq!(graph.node_count(), 7);
1480
1481        let test_nodes: Vec<_> = graph
1482            .node_indices()
1483            .filter(|&i| graph[i].node_type == NodeType::Test)
1484            .collect();
1485        assert_eq!(test_nodes.len(), 5);
1486
1487        // Verify test unique_ids
1488        let mut test_ids: Vec<&str> = test_nodes
1489            .iter()
1490            .map(|&i| graph[i].unique_id.as_str())
1491            .collect();
1492        test_ids.sort();
1493        assert!(test_ids.contains(&"test.not_null.orders.order_id"));
1494        assert!(test_ids.contains(&"test.unique.orders.order_id"));
1495        assert!(test_ids.contains(&"test.not_null.raw.events.event_id"));
1496        // Model-level tests: first gets base ID, second gets _2 suffix
1497        assert!(test_ids.contains(&"test.dbt_utils.expression_is_true.orders"));
1498        assert!(test_ids.contains(&"test.dbt_utils.expression_is_true.orders_2"));
1499
1500        // All test edges from model (2 column + 2 model-level = 4)
1501        let model_idx = graph
1502            .node_indices()
1503            .find(|&i| graph[i].unique_id == "model.orders")
1504            .unwrap();
1505        let source_idx = graph
1506            .node_indices()
1507            .find(|&i| graph[i].unique_id == "source.raw.events")
1508            .unwrap();
1509
1510        let model_test_edges = graph
1511            .edges_directed(model_idx, petgraph::Direction::Outgoing)
1512            .filter(|e| e.weight().edge_type == EdgeType::Test)
1513            .count();
1514        assert_eq!(model_test_edges, 4);
1515
1516        let source_test_edges = graph
1517            .edges_directed(source_idx, petgraph::Direction::Outgoing)
1518            .filter(|e| e.weight().edge_type == EdgeType::Test)
1519            .count();
1520        assert_eq!(source_test_edges, 1);
1521
1522        // All generic test nodes should have file_path pointing to the YAML file
1523        for &ti in &test_nodes {
1524            assert_eq!(
1525                graph[ti].file_path.as_deref(),
1526                Some(std::path::Path::new("models/schema.yml")),
1527                "test node '{}' should have file_path",
1528                graph[ti].unique_id,
1529            );
1530        }
1531
1532        // Deduped test labels must also be distinct (suffix applied to label)
1533        let mut test_labels: Vec<&str> = test_nodes
1534            .iter()
1535            .map(|&i| graph[i].label.as_str())
1536            .collect();
1537        test_labels.sort();
1538        let deduped_len = test_labels.len();
1539        test_labels.dedup();
1540        assert_eq!(
1541            test_labels.len(),
1542            deduped_len,
1543            "All test labels should be unique"
1544        );
1545        // Verify the deduped model-level test labels
1546        assert!(test_labels.contains(&"dbt_utils.expression_is_true_orders"));
1547        assert!(test_labels.contains(&"dbt_utils.expression_is_true_orders_2"));
1548    }
1549
1550    #[test]
1551    fn test_generic_test_ids_deterministic_across_yaml_order() {
1552        // Duplicate test names across two YAML files should produce the same
1553        // suffixed IDs regardless of the order the files are passed in.
1554        let tmp = tempfile::tempdir().unwrap();
1555        let project_dir = tmp.path().to_path_buf();
1556        let models_dir = project_dir.join("models");
1557        let sub_dir = models_dir.join("sub");
1558        fs::create_dir_all(&sub_dir).unwrap();
1559
1560        fs::write(models_dir.join("orders.sql"), "SELECT 1 AS order_id").unwrap();
1561
1562        // Two YAML files that both declare a not_null test on orders.order_id
1563        let yaml_a = models_dir.join("a_schema.yml");
1564        let yaml_b = sub_dir.join("b_schema.yml");
1565        let yaml_content = r#"
1566models:
1567  - name: orders
1568    columns:
1569      - name: order_id
1570        data_tests:
1571          - not_null
1572"#;
1573        fs::write(&yaml_a, yaml_content).unwrap();
1574        fs::write(&yaml_b, yaml_content).unwrap();
1575
1576        // Build with files in forward order
1577        let files_fwd = DiscoveredFiles {
1578            model_sql_files: vec![project_dir.join("models/orders.sql")],
1579            yaml_files: vec![yaml_a.clone(), yaml_b.clone()],
1580            ..Default::default()
1581        };
1582        let graph_fwd =
1583            build_graph(&project_dir, &files_fwd, None, true, false, &HashMap::new()).unwrap();
1584
1585        // Build with files in reverse order
1586        let files_rev = DiscoveredFiles {
1587            model_sql_files: vec![project_dir.join("models/orders.sql")],
1588            yaml_files: vec![yaml_b, yaml_a],
1589            ..Default::default()
1590        };
1591        let graph_rev =
1592            build_graph(&project_dir, &files_rev, None, true, false, &HashMap::new()).unwrap();
1593
1594        // Both should produce the same set of test unique_ids
1595        let mut ids_fwd: Vec<String> = graph_fwd
1596            .node_indices()
1597            .filter(|&i| graph_fwd[i].node_type == NodeType::Test)
1598            .map(|i| graph_fwd[i].unique_id.clone())
1599            .collect();
1600        ids_fwd.sort();
1601
1602        let mut ids_rev: Vec<String> = graph_rev
1603            .node_indices()
1604            .filter(|&i| graph_rev[i].node_type == NodeType::Test)
1605            .map(|i| graph_rev[i].unique_id.clone())
1606            .collect();
1607        ids_rev.sort();
1608
1609        assert_eq!(ids_fwd, ids_rev);
1610        assert_eq!(ids_fwd.len(), 2);
1611        assert!(ids_fwd.contains(&"test.not_null.orders.order_id".to_string()));
1612        assert!(ids_fwd.contains(&"test.not_null.orders.order_id_2".to_string()));
1613    }
1614}