1use anyhow::Result;
2use petgraph::stable_graph::NodeIndex;
3use rayon::prelude::*;
4use std::collections::HashMap;
5use std::path::Path;
6
7use std::path::PathBuf;
8
9use crate::graph::types::{ExposureInfo, OwnerInfo};
10use crate::parser::cache;
11use crate::parser::columns::extract_select_columns;
12use crate::parser::discovery::DiscoveredFiles;
13use crate::parser::jinja::JinjaExtraction;
14use crate::parser::sql::{
15 RefCall, SourceCall, extract_all_with_vars, extract_refs_and_sources_with_vars,
16};
17use crate::parser::yaml_schema::{ExposureDefinition, SchemaFile, parse_schema_file};
18
19fn load_macro_prefix(files: &DiscoveredFiles) -> String {
22 let sources: Vec<String> = files
23 .macro_sql_files
24 .iter()
25 .filter_map(|path| match std::fs::read_to_string(path) {
26 Ok(content) => Some(content),
27 Err(e) => {
28 crate::warn!("could not read macro file {}: {}", path.display(), e);
29 None
30 }
31 })
32 .collect();
33 crate::parser::jinja::build_macro_prefix(&sources)
34}
35
36use super::types::*;
37
38struct GraphBuilder {
40 graph: LineageGraph,
41 node_map: HashMap<String, NodeIndex>,
42}
43
44impl GraphBuilder {
45 fn new() -> Self {
46 Self {
47 graph: LineageGraph::new(),
48 node_map: HashMap::new(),
49 }
50 }
51
52 fn add_node(&mut self, data: NodeData) -> NodeIndex {
54 let idx = self.graph.add_node(data);
55 let unique_id = self.graph[idx].unique_id.clone();
56 self.node_map.insert(unique_id, idx);
57 idx
58 }
59
60 fn get_or_create_phantom_ref(&mut self, ref_name: &str, sql_path: &Path) -> NodeIndex {
62 let dep_id = resolve_ref(ref_name, &self.node_map);
63 if let Some(&idx) = self.node_map.get(&dep_id) {
64 return idx;
65 }
66 crate::warn!("unresolved ref '{}' in {}", ref_name, sql_path.display());
67 let phantom_id = format!("model.{}", ref_name);
68 self.add_node(NodeData {
69 unique_id: phantom_id,
70 label: ref_name.to_string(),
71 node_type: NodeType::Phantom,
72 file_path: None,
73 description: None,
74 materialization: None,
75 tags: vec![],
76 columns: vec![],
77 exposure: None,
78 })
79 }
80
81 fn get_or_create_phantom_source(
83 &mut self,
84 source_name: &str,
85 table_name: &str,
86 sql_path: &Path,
87 ) -> NodeIndex {
88 let source_id = format!("source.{}.{}", source_name, table_name);
89 if let Some(&idx) = self.node_map.get(&source_id) {
90 return idx;
91 }
92 crate::warn!(
93 "unresolved source '{}.{}' in {}",
94 source_name,
95 table_name,
96 sql_path.display()
97 );
98 let label = format!("{}.{}", source_name, table_name);
99 self.add_node(NodeData {
100 unique_id: source_id,
101 label,
102 node_type: NodeType::Phantom,
103 file_path: None,
104 description: None,
105 materialization: None,
106 tags: vec![],
107 columns: vec![],
108 exposure: None,
109 })
110 }
111}
112
113fn read_file(path: &Path) -> Result<String> {
115 std::fs::read_to_string(path).map_err(|e| {
116 crate::error::DbtLineageError::FileReadError {
117 path: path.to_path_buf(),
118 source: e,
119 }
120 .into()
121 })
122}
123
124fn file_stem_str(path: &Path) -> String {
126 path.file_stem()
127 .and_then(|s| s.to_str())
128 .unwrap_or("unknown")
129 .to_string()
130}
131
132fn add_source_nodes(
134 gb: &mut GraphBuilder,
135 schema: &crate::parser::yaml_schema::SchemaFile,
136 yaml_path: &Path,
137 project_dir: &Path,
138) {
139 let relative_path = yaml_path
140 .strip_prefix(project_dir)
141 .unwrap_or(yaml_path)
142 .to_path_buf();
143 for source_def in &schema.sources {
144 for table in &source_def.tables {
145 let unique_id = format!("source.{}.{}", source_def.name, table.name);
146 let label = format!("{}.{}", source_def.name, table.name);
147 gb.add_node(NodeData {
148 unique_id,
149 label,
150 node_type: NodeType::Source,
151 file_path: Some(relative_path.clone()),
152 description: table
153 .description
154 .clone()
155 .or_else(|| source_def.description.clone()),
156 materialization: None,
157 tags: vec![],
158 columns: vec![],
159 exposure: None,
160 });
161 }
162 }
163}
164
165#[derive(Clone, Default)]
167struct YamlModelMeta {
168 description: Option<String>,
169 materialization: Option<String>,
170 tags: Vec<String>,
171 columns: Vec<String>,
172}
173
174type YamlResult = (
178 HashMap<String, YamlModelMeta>,
179 Vec<ExposureDefinition>,
180 Vec<(SchemaFile, PathBuf)>,
181);
182
183fn process_yaml_files(
186 gb: &mut GraphBuilder,
187 files: &DiscoveredFiles,
188 project_dir: &Path,
189) -> Result<YamlResult> {
190 let mut model_meta: HashMap<String, YamlModelMeta> = HashMap::new();
191 let mut exposures: Vec<ExposureDefinition> = Vec::new();
192 let mut schemas: Vec<(SchemaFile, PathBuf)> = Vec::new();
193
194 let mut sorted_yaml_files = files.yaml_files.clone();
197 sorted_yaml_files.sort();
198
199 for yaml_path in &sorted_yaml_files {
200 let content = read_file(yaml_path)?;
201 let schema = match parse_schema_file(&content, Some(yaml_path.as_path())) {
202 Ok(s) => s,
203 Err(_) => continue,
204 };
205
206 add_source_nodes(gb, &schema, yaml_path, project_dir);
207
208 for model_def in &schema.models {
209 let mut meta = YamlModelMeta {
210 description: model_def.description.clone(),
211 columns: model_def.columns.iter().map(|c| c.name.clone()).collect(),
212 ..Default::default()
213 };
214 let mut tags = model_def.tags.clone();
216 if let Some(cfg) = &model_def.config {
217 meta.materialization = cfg.materialized.clone();
218 tags.extend(cfg.tags.clone());
219 }
220 tags.sort();
221 tags.dedup();
222 meta.tags = tags;
223 model_meta.insert(model_def.name.clone(), meta);
224 }
225
226 exposures.extend(schema.exposures.iter().cloned());
227 let relative_path = yaml_path
228 .strip_prefix(project_dir)
229 .unwrap_or(yaml_path)
230 .to_path_buf();
231 schemas.push((schema, relative_path));
232 }
233
234 Ok((model_meta, exposures, schemas))
235}
236
237type ExtractionCache = HashMap<PathBuf, (Vec<RefCall>, Vec<SourceCall>)>;
240
241struct ModelExtraction {
243 sql_path: PathBuf,
244 model_name: String,
245 extraction: Option<JinjaExtraction>,
246 columns: Vec<String>,
247 from_cache: bool,
249}
250
251fn process_model_files(
255 gb: &mut GraphBuilder,
256 files: &DiscoveredFiles,
257 project_dir: &Path,
258 model_meta: &HashMap<String, YamlModelMeta>,
259 macro_prefix: &str,
260 disk_cache: &mut cache::ExtractionCache,
261 vars: &HashMap<String, serde_json::Value>,
262) -> ExtractionCache {
263 let cache_ref = &*disk_cache;
266 let extractions: Vec<ModelExtraction> = files
267 .model_sql_files
268 .par_iter()
269 .map(|sql_path| {
270 let model_name = file_stem_str(sql_path);
271
272 if let Some(cached) = cache_ref.get(sql_path, project_dir) {
274 let sql_content = std::fs::read_to_string(sql_path).ok();
275 let columns = sql_content
276 .as_ref()
277 .map(|content| extract_select_columns(content))
278 .unwrap_or_default();
279 return ModelExtraction {
280 sql_path: sql_path.clone(),
281 model_name,
282 extraction: Some(cached.clone()),
283 columns,
284 from_cache: true,
285 };
286 }
287
288 let sql_content = std::fs::read_to_string(sql_path).ok();
289
290 let extraction = sql_content
291 .as_ref()
292 .map(|content| extract_all_with_vars(content, macro_prefix, vars));
293
294 let columns = sql_content
295 .as_ref()
296 .map(|content| extract_select_columns(content))
297 .unwrap_or_default();
298
299 ModelExtraction {
300 sql_path: sql_path.clone(),
301 model_name,
302 extraction,
303 columns,
304 from_cache: false,
305 }
306 })
307 .collect();
308
309 let mut model_name_paths: HashMap<String, std::path::PathBuf> = HashMap::new();
311 let mut mem_cache: ExtractionCache = HashMap::new();
312
313 for me in extractions {
314 if let Some(existing_path) = model_name_paths.get(&me.model_name) {
315 crate::warn!(
316 "duplicate model name '{}' in {} and {}",
317 me.model_name,
318 existing_path.display(),
319 me.sql_path.display()
320 );
321 }
322 model_name_paths.insert(me.model_name.clone(), me.sql_path.clone());
323
324 let from_cache = me.from_cache;
325 let (sql_config, cached_refs_sources) = match me.extraction {
326 Some(ext) => {
327 if !from_cache {
329 disk_cache.insert(&me.sql_path, project_dir, &ext);
330 }
331 (ext.config, Some((ext.refs, ext.sources)))
332 }
333 None => (Default::default(), None),
334 };
335
336 if let Some(rs) = cached_refs_sources {
337 mem_cache.insert(me.sql_path.clone(), rs);
338 }
339
340 let yaml_meta = model_meta.get(&me.model_name);
341
342 let materialization = sql_config
343 .materialized
344 .or_else(|| yaml_meta.and_then(|m| m.materialization.clone()));
345
346 let mut tags = sql_config.tags;
347 if let Some(meta) = yaml_meta {
348 tags.extend(meta.tags.clone());
349 }
350 tags.sort();
351 tags.dedup();
352
353 let unique_id = format!("model.{}", me.model_name);
354 let relative_path = me
355 .sql_path
356 .strip_prefix(project_dir)
357 .unwrap_or(&me.sql_path)
358 .to_path_buf();
359
360 let columns = match yaml_meta {
362 Some(m) if !m.columns.is_empty() => m.columns.clone(),
363 _ => me.columns,
364 };
365
366 gb.add_node(NodeData {
367 unique_id,
368 label: me.model_name,
369 node_type: NodeType::Model,
370 file_path: Some(relative_path),
371 description: yaml_meta.and_then(|m| m.description.clone()),
372 materialization,
373 tags,
374 columns,
375 exposure: None,
376 });
377 }
378
379 mem_cache
380}
381
382fn process_simple_nodes(
384 gb: &mut GraphBuilder,
385 paths: &[std::path::PathBuf],
386 project_dir: &Path,
387 prefix: &str,
388 node_type: NodeType,
389) {
390 for path in paths {
391 let name = file_stem_str(path);
392 let unique_id = format!("{}.{}", prefix, name);
393 let relative_path = path.strip_prefix(project_dir).unwrap_or(path).to_path_buf();
394
395 gb.add_node(NodeData {
396 unique_id,
397 label: name,
398 node_type,
399 file_path: Some(relative_path),
400 description: None,
401 materialization: None,
402 tags: vec![],
403 columns: vec![],
404 exposure: None,
405 });
406 }
407}
408
409fn process_sql_edges(
413 gb: &mut GraphBuilder,
414 files: &DiscoveredFiles,
415 project_dir: &Path,
416 macro_prefix: &str,
417 extraction_cache: &ExtractionCache,
418 vars: &HashMap<String, serde_json::Value>,
419) -> Result<()> {
420 let all_sql_files: Vec<(&std::path::PathBuf, &str)> = files
421 .model_sql_files
422 .iter()
423 .map(|p| (p, "model"))
424 .chain(files.snapshot_sql_files.iter().map(|p| (p, "snapshot")))
425 .chain(files.test_sql_files.iter().map(|p| (p, "test")))
426 .collect();
427
428 for (sql_path, file_type) in &all_sql_files {
429 let node_name = file_stem_str(sql_path);
430 let node_unique_id = format!("{}.{}", file_type, node_name);
431
432 if *file_type == "test" {
434 let relative_path = sql_path
435 .strip_prefix(project_dir)
436 .unwrap_or(sql_path)
437 .to_path_buf();
438 gb.add_node(NodeData {
439 unique_id: node_unique_id.clone(),
440 label: node_name,
441 node_type: NodeType::Test,
442 file_path: Some(relative_path),
443 description: None,
444 materialization: None,
445 tags: vec![],
446 columns: vec![],
447 exposure: None,
448 });
449 }
450
451 let current_idx = match gb.node_map.get(&node_unique_id) {
452 Some(&idx) => idx,
453 None => continue,
454 };
455
456 let owned;
458 let (refs, sources) = if let Some(cached) = extraction_cache.get(*sql_path) {
459 (&cached.0, &cached.1)
460 } else {
461 let content = read_file(sql_path)?;
462 owned = extract_refs_and_sources_with_vars(&content, macro_prefix, vars);
463 (&owned.0, &owned.1)
464 };
465
466 let is_test = *file_type == "test";
469
470 for ref_call in refs {
471 let dep_idx = gb.get_or_create_phantom_ref(&ref_call.name, sql_path);
472 let edge_type = if is_test {
473 EdgeType::Test
474 } else {
475 EdgeType::Ref
476 };
477 gb.graph
478 .add_edge(dep_idx, current_idx, EdgeData::direct(edge_type));
479 }
480
481 for source_call in sources {
482 let source_idx = gb.get_or_create_phantom_source(
483 &source_call.source_name,
484 &source_call.table_name,
485 sql_path,
486 );
487 let edge_type = if is_test {
488 EdgeType::Test
489 } else {
490 EdgeType::Source
491 };
492 gb.graph
493 .add_edge(source_idx, current_idx, EdgeData::direct(edge_type));
494 }
495 }
496
497 Ok(())
498}
499
500fn process_exposures(gb: &mut GraphBuilder, exposures: &[ExposureDefinition]) {
502 for exposure in exposures {
503 let unique_id = format!("exposure.{}", exposure.name);
504 let idx = gb.add_node(NodeData {
505 unique_id,
506 label: exposure.name.clone(),
507 node_type: NodeType::Exposure,
508 file_path: None,
509 description: exposure.description.clone(),
510 materialization: None,
511 tags: vec![],
512 columns: vec![],
513 exposure: Some(ExposureInfo {
514 label: exposure.label.clone(),
515 exposure_type: exposure.exposure_type.clone(),
516 url: exposure.url.clone(),
517 maturity: exposure.maturity.clone(),
518 owner: exposure.owner.as_ref().map(|o| OwnerInfo {
519 name: o.name.as_ref().filter(|s| !s.trim().is_empty()).cloned(),
520 email: o.email.as_ref().filter(|s| !s.trim().is_empty()).cloned(),
521 }),
522 }),
523 });
524
525 for dep in &exposure.depends_on {
526 if let Some(model_name) = parse_exposure_ref(dep) {
527 let dep_id = resolve_ref(&model_name, &gb.node_map);
528 if let Some(&dep_idx) = gb.node_map.get(&dep_id) {
529 gb.graph
530 .add_edge(dep_idx, idx, EdgeData::direct(EdgeType::Exposure));
531 }
532 }
533 }
534 }
535}
536
537fn dedup_unique_id(
542 candidate: &str,
543 node_map: &HashMap<String, NodeIndex>,
544) -> (String, Option<String>) {
545 if !node_map.contains_key(candidate) {
546 return (candidate.to_string(), None);
547 }
548 let mut n = 2u32;
549 loop {
550 let suffix = format!("_{}", n);
551 let suffixed = format!("{}{}", candidate, suffix);
552 if !node_map.contains_key(&suffixed) {
553 return (suffixed, Some(suffix));
554 }
555 n += 1;
556 }
557}
558
559fn add_generic_test_node(
561 gb: &mut GraphBuilder,
562 parent_idx: NodeIndex,
563 unique_id: String,
564 label: String,
565 file_path: Option<PathBuf>,
566) {
567 let idx = gb.add_node(NodeData {
568 unique_id,
569 label,
570 node_type: NodeType::Test,
571 file_path,
572 description: None,
573 materialization: None,
574 tags: vec![],
575 columns: vec![],
576 exposure: None,
577 });
578 gb.graph
579 .add_edge(parent_idx, idx, EdgeData::direct(EdgeType::Test));
580}
581
582fn process_generic_tests(gb: &mut GraphBuilder, schemas: &[(SchemaFile, PathBuf)]) {
585 for (schema, yaml_path) in schemas {
586 let file_path = Some(yaml_path.clone());
587
588 for model_def in &schema.models {
590 let parent_id = format!("model.{}", model_def.name);
591 let parent_idx = match gb.node_map.get(&parent_id) {
592 Some(&idx) => idx,
593 None => continue,
594 };
595
596 for test_def in &model_def.tests {
598 let test_name = match test_def.test_name() {
599 Some(name) => name,
600 None => continue,
601 };
602 let candidate = format!("test.{}.{}", test_name, model_def.name);
603 let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
604 let mut label = format!("{}_{}", test_name, model_def.name);
605 if let Some(s) = suffix {
606 label.push_str(&s);
607 }
608 add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
609 }
610
611 for col in &model_def.columns {
613 for test_def in &col.tests {
614 let test_name = match test_def.test_name() {
615 Some(name) => name,
616 None => continue,
617 };
618 let candidate = format!("test.{}.{}.{}", test_name, model_def.name, col.name);
619 let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
620 let mut label = format!("{}_{}_{}", test_name, model_def.name, col.name);
621 if let Some(s) = suffix {
622 label.push_str(&s);
623 }
624 add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
625 }
626 }
627 }
628
629 for source_def in &schema.sources {
631 for table in &source_def.tables {
632 let parent_id = format!("source.{}.{}", source_def.name, table.name);
633 let parent_idx = match gb.node_map.get(&parent_id) {
634 Some(&idx) => idx,
635 None => continue,
636 };
637 for col in &table.columns {
638 for test_def in &col.tests {
639 let test_name = match test_def.test_name() {
640 Some(name) => name,
641 None => continue,
642 };
643 let candidate = format!(
644 "test.{}.{}.{}.{}",
645 test_name, source_def.name, table.name, col.name
646 );
647 let (unique_id, suffix) = dedup_unique_id(&candidate, &gb.node_map);
648 let mut label = format!(
649 "{}_{}_{}_{}",
650 test_name, source_def.name, table.name, col.name
651 );
652 if let Some(s) = suffix {
653 label.push_str(&s);
654 }
655 add_generic_test_node(gb, parent_idx, unique_id, label, file_path.clone());
656 }
657 }
658 }
659 }
660 }
661}
662
663pub fn build_graph(
670 project_dir: &Path,
671 files: &DiscoveredFiles,
672 cache_dir: Option<&Path>,
673 no_cache: bool,
674 refresh_cache: bool,
675 vars: &HashMap<String, serde_json::Value>,
676) -> Result<LineageGraph> {
677 let mut gb = GraphBuilder::new();
678 let macro_prefix = load_macro_prefix(files);
679 let mut disk_cache = if no_cache {
680 cache::ExtractionCache::disabled()
681 } else if refresh_cache {
682 cache::ExtractionCache::fresh(project_dir, ¯o_prefix, vars, cache_dir)
683 } else {
684 cache::ExtractionCache::load(project_dir, ¯o_prefix, vars, cache_dir)
685 };
686
687 let (model_meta, exposures, schemas) = process_yaml_files(&mut gb, files, project_dir)?;
688 let extraction_cache = process_model_files(
689 &mut gb,
690 files,
691 project_dir,
692 &model_meta,
693 ¯o_prefix,
694 &mut disk_cache,
695 vars,
696 );
697 process_simple_nodes(
698 &mut gb,
699 &files.seed_files,
700 project_dir,
701 "seed",
702 NodeType::Seed,
703 );
704 process_simple_nodes(
705 &mut gb,
706 &files.snapshot_sql_files,
707 project_dir,
708 "snapshot",
709 NodeType::Snapshot,
710 );
711 process_sql_edges(
712 &mut gb,
713 files,
714 project_dir,
715 ¯o_prefix,
716 &extraction_cache,
717 vars,
718 )?;
719 process_exposures(&mut gb, &exposures);
720 process_generic_tests(&mut gb, &schemas);
721
722 disk_cache.save();
723
724 Ok(gb.graph)
725}
726
727fn resolve_ref(name: &str, node_map: &HashMap<String, NodeIndex>) -> String {
729 let model_id = format!("model.{}", name);
731 if node_map.contains_key(&model_id) {
732 return model_id;
733 }
734
735 let seed_id = format!("seed.{}", name);
736 if node_map.contains_key(&seed_id) {
737 return seed_id;
738 }
739
740 let snapshot_id = format!("snapshot.{}", name);
741 if node_map.contains_key(&snapshot_id) {
742 return snapshot_id;
743 }
744
745 model_id
747}
748
749fn parse_exposure_ref(dep: &str) -> Option<String> {
751 let dep = dep.trim();
752 if dep.starts_with("ref(") {
753 let inner = dep.trim_start_matches("ref(").trim_end_matches(')');
755 let name = inner.trim().trim_matches('\'').trim_matches('"');
756 Some(name.to_string())
757 } else if dep.starts_with("source(") {
758 None
760 } else {
761 None
762 }
763}
764
765#[cfg(test)]
766mod tests {
767 use super::*;
768 use crate::parser::discovery::DiscoveredFiles;
769 use std::fs;
770 use std::path::PathBuf;
771
772 #[test]
773 fn test_resolve_ref_model() {
774 let mut node_map = HashMap::new();
775 let graph = &mut LineageGraph::new();
776 let idx = graph.add_node(NodeData {
777 unique_id: "model.orders".to_string(),
778 label: "orders".to_string(),
779 node_type: NodeType::Model,
780 file_path: None,
781 description: None,
782 materialization: None,
783 tags: vec![],
784 columns: vec![],
785 exposure: None,
786 });
787 node_map.insert("model.orders".to_string(), idx);
788
789 assert_eq!(resolve_ref("orders", &node_map), "model.orders");
790 }
791
792 #[test]
793 fn test_resolve_ref_seed() {
794 let mut node_map = HashMap::new();
795 let graph = &mut LineageGraph::new();
796 let idx = graph.add_node(NodeData {
797 unique_id: "seed.countries".to_string(),
798 label: "countries".to_string(),
799 node_type: NodeType::Seed,
800 file_path: None,
801 description: None,
802 materialization: None,
803 tags: vec![],
804 columns: vec![],
805 exposure: None,
806 });
807 node_map.insert("seed.countries".to_string(), idx);
808
809 assert_eq!(resolve_ref("countries", &node_map), "seed.countries");
810 }
811
812 #[test]
813 fn test_resolve_ref_snapshot() {
814 let mut node_map = HashMap::new();
815 let graph = &mut LineageGraph::new();
816 let idx = graph.add_node(NodeData {
817 unique_id: "snapshot.snap_orders".to_string(),
818 label: "snap_orders".to_string(),
819 node_type: NodeType::Snapshot,
820 file_path: None,
821 description: None,
822 materialization: None,
823 tags: vec![],
824 columns: vec![],
825 exposure: None,
826 });
827 node_map.insert("snapshot.snap_orders".to_string(), idx);
828
829 assert_eq!(
830 resolve_ref("snap_orders", &node_map),
831 "snapshot.snap_orders"
832 );
833 }
834
835 #[test]
836 fn test_resolve_ref_unknown_defaults_to_model() {
837 let node_map = HashMap::new();
838 assert_eq!(resolve_ref("unknown_ref", &node_map), "model.unknown_ref");
839 }
840
841 #[test]
842 fn test_parse_exposure_ref() {
843 assert_eq!(
844 parse_exposure_ref("ref('orders')"),
845 Some("orders".to_string())
846 );
847 assert_eq!(
848 parse_exposure_ref("ref(\"orders\")"),
849 Some("orders".to_string())
850 );
851 assert_eq!(parse_exposure_ref("source('raw', 'orders')"), None);
852 assert_eq!(parse_exposure_ref("something_else"), None);
853 }
854
855 fn setup_temp_project() -> (tempfile::TempDir, PathBuf) {
857 let tmp = tempfile::tempdir().unwrap();
858 let project_dir = tmp.path().to_path_buf();
859
860 let models_dir = project_dir.join("models");
862 fs::create_dir_all(&models_dir).unwrap();
863
864 fs::write(
865 models_dir.join("stg_orders.sql"),
866 "SELECT * FROM {{ source('raw', 'orders') }}",
867 )
868 .unwrap();
869
870 fs::write(
871 models_dir.join("orders.sql"),
872 "SELECT * FROM {{ ref('stg_orders') }}",
873 )
874 .unwrap();
875
876 fs::write(
878 models_dir.join("schema.yml"),
879 r#"
880version: 2
881sources:
882 - name: raw
883 tables:
884 - name: orders
885 description: "Raw orders table"
886models:
887 - name: stg_orders
888 description: "Staged orders"
889"#,
890 )
891 .unwrap();
892
893 (tmp, project_dir)
894 }
895
896 #[test]
897 fn test_build_graph_sources_and_models() {
898 let (_tmp, project_dir) = setup_temp_project();
899
900 let files = DiscoveredFiles {
901 model_sql_files: vec![
902 project_dir.join("models/stg_orders.sql"),
903 project_dir.join("models/orders.sql"),
904 ],
905 yaml_files: vec![project_dir.join("models/schema.yml")],
906 ..Default::default()
907 };
908
909 let graph = build_graph(&project_dir, &files, None, false, false, &HashMap::new()).unwrap();
911
912 assert_eq!(graph.node_count(), 3);
914
915 let mut types: Vec<NodeType> = graph.node_indices().map(|i| graph[i].node_type).collect();
917 types.sort_by_key(|t| format!("{:?}", t));
918 assert!(types.contains(&NodeType::Source));
919 assert!(types.iter().filter(|t| **t == NodeType::Model).count() == 2);
920
921 assert_eq!(graph.edge_count(), 2);
923 }
924
925 #[test]
926 fn test_build_graph_with_seeds() {
927 let (_tmp, project_dir) = setup_temp_project();
928
929 let seeds_dir = project_dir.join("seeds");
931 fs::create_dir_all(&seeds_dir).unwrap();
932 fs::write(seeds_dir.join("countries.csv"), "id,name\n1,US\n").unwrap();
933
934 let files = DiscoveredFiles {
935 seed_files: vec![project_dir.join("seeds/countries.csv")],
936 ..Default::default()
937 };
938
939 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
940 assert_eq!(graph.node_count(), 1);
941 let node = &graph[graph.node_indices().next().unwrap()];
942 assert_eq!(node.node_type, NodeType::Seed);
943 assert_eq!(node.label, "countries");
944 }
945
946 #[test]
947 fn test_build_graph_with_snapshots() {
948 let (_tmp, project_dir) = setup_temp_project();
949
950 let snap_dir = project_dir.join("snapshots");
951 fs::create_dir_all(&snap_dir).unwrap();
952 fs::write(snap_dir.join("snap_orders.sql"), "SELECT 1").unwrap();
953
954 let files = DiscoveredFiles {
955 snapshot_sql_files: vec![project_dir.join("snapshots/snap_orders.sql")],
956 ..Default::default()
957 };
958
959 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
960 assert_eq!(graph.node_count(), 1);
961 let node = &graph[graph.node_indices().next().unwrap()];
962 assert_eq!(node.node_type, NodeType::Snapshot);
963 assert_eq!(node.label, "snap_orders");
964 }
965
966 #[test]
967 fn test_build_graph_with_tests() {
968 let (_tmp, project_dir) = setup_temp_project();
969
970 let test_dir = project_dir.join("tests");
971 fs::create_dir_all(&test_dir).unwrap();
972 fs::write(
973 test_dir.join("assert_positive.sql"),
974 "SELECT * FROM {{ ref('stg_orders') }} WHERE amount < 0",
975 )
976 .unwrap();
977
978 let models_dir = project_dir.join("models");
980 fs::create_dir_all(&models_dir).unwrap();
981 fs::write(models_dir.join("stg_orders.sql"), "SELECT 1").unwrap();
982
983 let files = DiscoveredFiles {
984 model_sql_files: vec![project_dir.join("models/stg_orders.sql")],
985 test_sql_files: vec![project_dir.join("tests/assert_positive.sql")],
986 ..Default::default()
987 };
988
989 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
990 assert_eq!(graph.node_count(), 2);
992 assert_eq!(graph.edge_count(), 1);
994
995 use petgraph::visit::IntoEdgeReferences;
997 let edge = graph.edge_references().next().unwrap();
998 assert_eq!(edge.weight().edge_type, EdgeType::Test);
999 }
1000
1001 #[test]
1002 fn test_build_graph_with_exposures() {
1003 let (_tmp, project_dir) = setup_temp_project();
1004
1005 let models_dir = project_dir.join("models");
1006 fs::create_dir_all(&models_dir).unwrap();
1007 fs::write(models_dir.join("orders.sql"), "SELECT 1").unwrap();
1008
1009 fs::write(
1010 models_dir.join("schema.yml"),
1011 r#"
1012version: 2
1013sources: []
1014models: []
1015exposures:
1016 - name: weekly_report
1017 description: "Weekly report dashboard"
1018 depends_on:
1019 - ref('orders')
1020"#,
1021 )
1022 .unwrap();
1023
1024 let files = DiscoveredFiles {
1025 model_sql_files: vec![project_dir.join("models/orders.sql")],
1026 yaml_files: vec![project_dir.join("models/schema.yml")],
1027 ..Default::default()
1028 };
1029
1030 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1031 assert_eq!(graph.node_count(), 2);
1033 assert_eq!(graph.edge_count(), 1);
1035 }
1036
1037 #[test]
1038 fn test_build_graph_ref_resolves_to_seed() {
1039 let tmp = tempfile::tempdir().unwrap();
1040 let project_dir = tmp.path().to_path_buf();
1041
1042 let models_dir = project_dir.join("models");
1043 let seeds_dir = project_dir.join("seeds");
1044 fs::create_dir_all(&models_dir).unwrap();
1045 fs::create_dir_all(&seeds_dir).unwrap();
1046
1047 fs::write(seeds_dir.join("countries.csv"), "id,name\n1,US\n").unwrap();
1048 fs::write(
1049 models_dir.join("stg_countries.sql"),
1050 "SELECT * FROM {{ ref('countries') }}",
1051 )
1052 .unwrap();
1053
1054 let files = DiscoveredFiles {
1055 model_sql_files: vec![project_dir.join("models/stg_countries.sql")],
1056 seed_files: vec![project_dir.join("seeds/countries.csv")],
1057 ..Default::default()
1058 };
1059
1060 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1061 assert_eq!(graph.node_count(), 2);
1063 assert_eq!(graph.edge_count(), 1);
1065
1066 let seed_node = graph
1068 .node_indices()
1069 .find(|&i| graph[i].label == "countries")
1070 .unwrap();
1071 assert_eq!(graph[seed_node].node_type, NodeType::Seed);
1072 }
1073
1074 #[test]
1075 fn test_build_graph_phantom_node_for_unresolved_ref() {
1076 let (_tmp, project_dir) = setup_temp_project();
1077
1078 let models_dir = project_dir.join("models");
1079 fs::create_dir_all(&models_dir).unwrap();
1080 fs::write(
1081 models_dir.join("orders.sql"),
1082 "SELECT * FROM {{ ref('nonexistent_model') }}",
1083 )
1084 .unwrap();
1085
1086 let files = DiscoveredFiles {
1087 model_sql_files: vec![project_dir.join("models/orders.sql")],
1088 ..Default::default()
1089 };
1090
1091 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1092 assert_eq!(graph.node_count(), 2);
1094 let phantom = graph
1095 .node_indices()
1096 .find(|&i| graph[i].node_type == NodeType::Phantom)
1097 .expect("Should have a phantom node");
1098 assert_eq!(graph[phantom].label, "nonexistent_model");
1099 }
1100
1101 #[test]
1102 fn test_build_graph_phantom_node_for_unresolved_source() {
1103 let (_tmp, project_dir) = setup_temp_project();
1104
1105 let models_dir = project_dir.join("models");
1106 fs::create_dir_all(&models_dir).unwrap();
1107 fs::write(
1108 models_dir.join("orders.sql"),
1109 "SELECT * FROM {{ source('unknown_src', 'unknown_table') }}",
1110 )
1111 .unwrap();
1112
1113 let files = DiscoveredFiles {
1114 model_sql_files: vec![project_dir.join("models/orders.sql")],
1115 ..Default::default()
1116 };
1117
1118 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1119 assert_eq!(graph.node_count(), 2);
1121 let phantom = graph
1122 .node_indices()
1123 .find(|&i| graph[i].node_type == NodeType::Phantom)
1124 .expect("Should have a phantom source node");
1125 assert_eq!(graph[phantom].label, "unknown_src.unknown_table");
1126 }
1127
1128 #[test]
1129 fn test_build_graph_model_descriptions() {
1130 let (_tmp, project_dir) = setup_temp_project();
1131
1132 let files = DiscoveredFiles {
1133 model_sql_files: vec![project_dir.join("models/stg_orders.sql")],
1134 yaml_files: vec![project_dir.join("models/schema.yml")],
1135 ..Default::default()
1136 };
1137
1138 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1139 let stg = graph
1140 .node_indices()
1141 .find(|&i| graph[i].label == "stg_orders")
1142 .unwrap();
1143 assert_eq!(graph[stg].description.as_deref(), Some("Staged orders"));
1144 }
1145
1146 #[test]
1147 fn test_build_graph_edge_types() {
1148 use petgraph::visit::IntoEdgeReferences;
1149
1150 let (_tmp, project_dir) = setup_temp_project();
1151
1152 let files = DiscoveredFiles {
1153 model_sql_files: vec![
1154 project_dir.join("models/stg_orders.sql"),
1155 project_dir.join("models/orders.sql"),
1156 ],
1157 yaml_files: vec![project_dir.join("models/schema.yml")],
1158 ..Default::default()
1159 };
1160
1161 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1162 let edge_types: Vec<EdgeType> = graph
1163 .edge_references()
1164 .map(|e| e.weight().edge_type)
1165 .collect();
1166 assert!(edge_types.contains(&EdgeType::Source));
1167 assert!(edge_types.contains(&EdgeType::Ref));
1168 }
1169
1170 #[test]
1171 fn test_build_graph_empty_files() {
1172 let tmp = tempfile::tempdir().unwrap();
1173 let files = DiscoveredFiles::default();
1174 let graph = build_graph(tmp.path(), &files, None, true, false, &HashMap::new()).unwrap();
1175 assert_eq!(graph.node_count(), 0);
1176 assert_eq!(graph.edge_count(), 0);
1177 }
1178
1179 #[test]
1180 fn test_build_graph_model_config_merge() {
1181 let tmp = tempfile::tempdir().unwrap();
1183 let project_dir = tmp.path().to_path_buf();
1184
1185 let models_dir = project_dir.join("models");
1186 fs::create_dir_all(&models_dir).unwrap();
1187
1188 fs::write(models_dir.join("stg_orders.sql"), "SELECT 1").unwrap();
1189
1190 fs::write(
1191 models_dir.join("schema.yml"),
1192 r#"
1193version: 2
1194sources: []
1195models:
1196 - name: stg_orders
1197 description: "Staged orders"
1198 tags:
1199 - staging
1200 config:
1201 materialized: table
1202 tags:
1203 - daily
1204"#,
1205 )
1206 .unwrap();
1207
1208 let files = DiscoveredFiles {
1209 model_sql_files: vec![project_dir.join("models/stg_orders.sql")],
1210 yaml_files: vec![project_dir.join("models/schema.yml")],
1211 ..Default::default()
1212 };
1213
1214 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1215 let stg = graph
1216 .node_indices()
1217 .find(|&i| graph[i].label == "stg_orders")
1218 .unwrap();
1219 assert_eq!(graph[stg].materialization.as_deref(), Some("table"));
1220 assert!(graph[stg].tags.contains(&"staging".to_string()));
1221 assert!(graph[stg].tags.contains(&"daily".to_string()));
1222 }
1223
1224 #[test]
1225 fn test_build_graph_duplicate_model_name() {
1226 let tmp = tempfile::tempdir().unwrap();
1228 let project_dir = tmp.path().to_path_buf();
1229
1230 let models_dir = project_dir.join("models");
1231 let subdir = models_dir.join("subdir");
1232 fs::create_dir_all(&subdir).unwrap();
1233
1234 fs::write(models_dir.join("orders.sql"), "SELECT 1").unwrap();
1235 fs::write(subdir.join("orders.sql"), "SELECT 2").unwrap();
1236
1237 let files = DiscoveredFiles {
1238 model_sql_files: vec![
1239 project_dir.join("models/orders.sql"),
1240 project_dir.join("models/subdir/orders.sql"),
1241 ],
1242 ..Default::default()
1243 };
1244
1245 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1247 let order_nodes: Vec<_> = graph
1249 .node_indices()
1250 .filter(|&i| graph[i].label == "orders")
1251 .collect();
1252 assert_eq!(order_nodes.len(), 2);
1253 }
1254
1255 #[test]
1256 fn test_build_graph_file_paths_are_relative() {
1257 let (_tmp, project_dir) = setup_temp_project();
1258
1259 let files = DiscoveredFiles {
1260 model_sql_files: vec![
1261 project_dir.join("models/stg_orders.sql"),
1262 project_dir.join("models/orders.sql"),
1263 ],
1264 yaml_files: vec![project_dir.join("models/schema.yml")],
1265 ..Default::default()
1266 };
1267
1268 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1269
1270 for idx in graph.node_indices() {
1271 let node = &graph[idx];
1272 if let Some(ref fp) = node.file_path {
1273 assert!(
1274 fp.is_relative(),
1275 "file_path for node '{}' should be relative but got: {}",
1276 node.label,
1277 fp.display()
1278 );
1279 assert!(
1280 !fp.starts_with(&project_dir),
1281 "file_path for node '{}' should not start with project_dir: {}",
1282 node.label,
1283 fp.display()
1284 );
1285 }
1286 }
1287
1288 let source_node = graph
1290 .node_indices()
1291 .find(|&i| graph[i].node_type == NodeType::Source)
1292 .expect("should have a source node");
1293 assert_eq!(
1294 graph[source_node].file_path.as_deref(),
1295 Some(std::path::Path::new("models/schema.yml"))
1296 );
1297
1298 let model_node = graph
1300 .node_indices()
1301 .find(|&i| graph[i].label == "stg_orders")
1302 .unwrap();
1303 assert_eq!(
1304 graph[model_node].file_path.as_deref(),
1305 Some(std::path::Path::new("models/stg_orders.sql"))
1306 );
1307 }
1308
1309 #[test]
1310 fn test_build_graph_with_macros() {
1311 let tmp = tempfile::tempdir().unwrap();
1312 let project_dir = tmp.path().to_path_buf();
1313
1314 let models_dir = project_dir.join("models");
1315 let macros_dir = project_dir.join("macros");
1316 fs::create_dir_all(&models_dir).unwrap();
1317 fs::create_dir_all(¯os_dir).unwrap();
1318
1319 fs::write(
1321 macros_dir.join("my_macro.sql"),
1322 r#"
1323{% macro my_cte() %}
1324 SELECT * FROM {{ ref('base_table') }}
1325{% endmacro %}
1326"#,
1327 )
1328 .unwrap();
1329
1330 fs::write(models_dir.join("base_table.sql"), "SELECT 1 as id").unwrap();
1332 fs::write(
1333 models_dir.join("derived.sql"),
1334 "SELECT * FROM ({{ my_cte() }})",
1335 )
1336 .unwrap();
1337
1338 let files = DiscoveredFiles {
1339 model_sql_files: vec![
1340 project_dir.join("models/base_table.sql"),
1341 project_dir.join("models/derived.sql"),
1342 ],
1343 macro_sql_files: vec![project_dir.join("macros/my_macro.sql")],
1344 ..Default::default()
1345 };
1346
1347 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1348 assert_eq!(graph.node_count(), 2);
1350 assert_eq!(graph.edge_count(), 1);
1352
1353 let base = graph
1355 .node_indices()
1356 .find(|&i| graph[i].label == "base_table")
1357 .unwrap();
1358 let derived = graph
1359 .node_indices()
1360 .find(|&i| graph[i].label == "derived")
1361 .unwrap();
1362 assert!(graph.contains_edge(base, derived));
1363 }
1364
1365 #[test]
1366 fn test_var_list_expansion_resolves_refs() {
1367 let tmp = tempfile::tempdir().unwrap();
1368 let project_dir = tmp.path().to_path_buf();
1369 let models_dir = project_dir.join("models");
1370 fs::create_dir_all(&models_dir).unwrap();
1371
1372 fs::write(project_dir.join("dbt_project.yml"), "name: var_test\n").unwrap();
1374
1375 fs::write(
1377 models_dir.join("combined.sql"),
1378 r#"
1379 {%- set categories = var("product_categories") -%}
1380 {%- for cat in categories -%}
1381 SELECT * FROM {{ ref('stg_' ~ cat ~ '_summary') }}
1382 {% if not loop.last %}UNION ALL{% endif %}
1383 {%- endfor -%}
1384 "#,
1385 )
1386 .unwrap();
1387
1388 fs::write(models_dir.join("stg_electronics_summary.sql"), "SELECT 1").unwrap();
1390 fs::write(models_dir.join("stg_clothing_summary.sql"), "SELECT 1").unwrap();
1391
1392 let files = DiscoveredFiles {
1393 model_sql_files: vec![
1394 project_dir.join("models/combined.sql"),
1395 project_dir.join("models/stg_electronics_summary.sql"),
1396 project_dir.join("models/stg_clothing_summary.sql"),
1397 ],
1398 ..Default::default()
1399 };
1400
1401 let mut vars = HashMap::new();
1403 vars.insert(
1404 "product_categories".to_string(),
1405 serde_json::json!(["electronics", "clothing"]),
1406 );
1407
1408 let graph = build_graph(&project_dir, &files, None, true, false, &vars).unwrap();
1409
1410 assert_eq!(graph.node_count(), 3);
1412
1413 assert_eq!(graph.edge_count(), 2);
1415
1416 let combined = graph
1417 .node_indices()
1418 .find(|&i| graph[i].label == "combined")
1419 .unwrap();
1420 let electronics = graph
1421 .node_indices()
1422 .find(|&i| graph[i].label == "stg_electronics_summary")
1423 .unwrap();
1424 let clothing = graph
1425 .node_indices()
1426 .find(|&i| graph[i].label == "stg_clothing_summary")
1427 .unwrap();
1428 assert!(graph.contains_edge(electronics, combined));
1429 assert!(graph.contains_edge(clothing, combined));
1430 }
1431
1432 #[test]
1433 fn test_generic_tests_from_yaml() {
1434 let tmp = tempfile::tempdir().unwrap();
1435 let project_dir = tmp.path().to_path_buf();
1436 let models_dir = project_dir.join("models");
1437 fs::create_dir_all(&models_dir).unwrap();
1438
1439 fs::write(project_dir.join("dbt_project.yml"), "name: test_proj\n").unwrap();
1440 fs::write(models_dir.join("orders.sql"), "SELECT 1 AS order_id").unwrap();
1441
1442 fs::write(
1444 models_dir.join("schema.yml"),
1445 r#"
1446sources:
1447 - name: raw
1448 tables:
1449 - name: events
1450 columns:
1451 - name: event_id
1452 data_tests:
1453 - not_null
1454models:
1455 - name: orders
1456 data_tests:
1457 - dbt_utils.expression_is_true:
1458 expression: "a = b"
1459 - dbt_utils.expression_is_true:
1460 expression: "c = d"
1461 columns:
1462 - name: order_id
1463 data_tests:
1464 - not_null
1465 - unique
1466"#,
1467 )
1468 .unwrap();
1469
1470 let files = DiscoveredFiles {
1471 model_sql_files: vec![project_dir.join("models/orders.sql")],
1472 yaml_files: vec![project_dir.join("models/schema.yml")],
1473 ..Default::default()
1474 };
1475
1476 let graph = build_graph(&project_dir, &files, None, true, false, &HashMap::new()).unwrap();
1477
1478 assert_eq!(graph.node_count(), 7);
1480
1481 let test_nodes: Vec<_> = graph
1482 .node_indices()
1483 .filter(|&i| graph[i].node_type == NodeType::Test)
1484 .collect();
1485 assert_eq!(test_nodes.len(), 5);
1486
1487 let mut test_ids: Vec<&str> = test_nodes
1489 .iter()
1490 .map(|&i| graph[i].unique_id.as_str())
1491 .collect();
1492 test_ids.sort();
1493 assert!(test_ids.contains(&"test.not_null.orders.order_id"));
1494 assert!(test_ids.contains(&"test.unique.orders.order_id"));
1495 assert!(test_ids.contains(&"test.not_null.raw.events.event_id"));
1496 assert!(test_ids.contains(&"test.dbt_utils.expression_is_true.orders"));
1498 assert!(test_ids.contains(&"test.dbt_utils.expression_is_true.orders_2"));
1499
1500 let model_idx = graph
1502 .node_indices()
1503 .find(|&i| graph[i].unique_id == "model.orders")
1504 .unwrap();
1505 let source_idx = graph
1506 .node_indices()
1507 .find(|&i| graph[i].unique_id == "source.raw.events")
1508 .unwrap();
1509
1510 let model_test_edges = graph
1511 .edges_directed(model_idx, petgraph::Direction::Outgoing)
1512 .filter(|e| e.weight().edge_type == EdgeType::Test)
1513 .count();
1514 assert_eq!(model_test_edges, 4);
1515
1516 let source_test_edges = graph
1517 .edges_directed(source_idx, petgraph::Direction::Outgoing)
1518 .filter(|e| e.weight().edge_type == EdgeType::Test)
1519 .count();
1520 assert_eq!(source_test_edges, 1);
1521
1522 for &ti in &test_nodes {
1524 assert_eq!(
1525 graph[ti].file_path.as_deref(),
1526 Some(std::path::Path::new("models/schema.yml")),
1527 "test node '{}' should have file_path",
1528 graph[ti].unique_id,
1529 );
1530 }
1531
1532 let mut test_labels: Vec<&str> = test_nodes
1534 .iter()
1535 .map(|&i| graph[i].label.as_str())
1536 .collect();
1537 test_labels.sort();
1538 let deduped_len = test_labels.len();
1539 test_labels.dedup();
1540 assert_eq!(
1541 test_labels.len(),
1542 deduped_len,
1543 "All test labels should be unique"
1544 );
1545 assert!(test_labels.contains(&"dbt_utils.expression_is_true_orders"));
1547 assert!(test_labels.contains(&"dbt_utils.expression_is_true_orders_2"));
1548 }
1549
1550 #[test]
1551 fn test_generic_test_ids_deterministic_across_yaml_order() {
1552 let tmp = tempfile::tempdir().unwrap();
1555 let project_dir = tmp.path().to_path_buf();
1556 let models_dir = project_dir.join("models");
1557 let sub_dir = models_dir.join("sub");
1558 fs::create_dir_all(&sub_dir).unwrap();
1559
1560 fs::write(models_dir.join("orders.sql"), "SELECT 1 AS order_id").unwrap();
1561
1562 let yaml_a = models_dir.join("a_schema.yml");
1564 let yaml_b = sub_dir.join("b_schema.yml");
1565 let yaml_content = r#"
1566models:
1567 - name: orders
1568 columns:
1569 - name: order_id
1570 data_tests:
1571 - not_null
1572"#;
1573 fs::write(&yaml_a, yaml_content).unwrap();
1574 fs::write(&yaml_b, yaml_content).unwrap();
1575
1576 let files_fwd = DiscoveredFiles {
1578 model_sql_files: vec![project_dir.join("models/orders.sql")],
1579 yaml_files: vec![yaml_a.clone(), yaml_b.clone()],
1580 ..Default::default()
1581 };
1582 let graph_fwd =
1583 build_graph(&project_dir, &files_fwd, None, true, false, &HashMap::new()).unwrap();
1584
1585 let files_rev = DiscoveredFiles {
1587 model_sql_files: vec![project_dir.join("models/orders.sql")],
1588 yaml_files: vec![yaml_b, yaml_a],
1589 ..Default::default()
1590 };
1591 let graph_rev =
1592 build_graph(&project_dir, &files_rev, None, true, false, &HashMap::new()).unwrap();
1593
1594 let mut ids_fwd: Vec<String> = graph_fwd
1596 .node_indices()
1597 .filter(|&i| graph_fwd[i].node_type == NodeType::Test)
1598 .map(|i| graph_fwd[i].unique_id.clone())
1599 .collect();
1600 ids_fwd.sort();
1601
1602 let mut ids_rev: Vec<String> = graph_rev
1603 .node_indices()
1604 .filter(|&i| graph_rev[i].node_type == NodeType::Test)
1605 .map(|i| graph_rev[i].unique_id.clone())
1606 .collect();
1607 ids_rev.sort();
1608
1609 assert_eq!(ids_fwd, ids_rev);
1610 assert_eq!(ids_fwd.len(), 2);
1611 assert!(ids_fwd.contains(&"test.not_null.orders.order_id".to_string()));
1612 assert!(ids_fwd.contains(&"test.not_null.orders.order_id_2".to_string()));
1613 }
1614}