1use std::io::{self, BufRead, IsTerminal};
2use std::path::{Path, PathBuf};
3
4use path_slash::PathExt as _;
5
6use crate::graph::types::LineageGraph;
7use crate::parser::project::ResolvedPaths;
8use crate::parser::yaml_schema;
9
10enum InputLine {
12 SqlFile(PathBuf),
14 YamlFile(PathBuf),
16 ModelName(String),
18 Ignore,
20}
21
22pub(crate) fn normalize_path(path: &Path) -> PathBuf {
26 if let Ok(canonical) = path.canonicalize() {
28 return canonical;
29 }
30 let mut components_to_append = Vec::new();
32 let mut current = path.to_path_buf();
33 loop {
34 if let Ok(canonical) = current.canonicalize() {
35 let mut result = canonical;
36 for component in components_to_append.into_iter().rev() {
37 result.push(component);
38 }
39 return result;
40 }
41 if let Some(file_name) = current.file_name() {
42 components_to_append.push(file_name.to_owned());
43 }
44 if !current.pop() {
45 break;
46 }
47 }
48 path.to_path_buf()
49}
50
51fn to_absolute(path_str: &str, cwd: &Path) -> PathBuf {
56 let path = Path::new(path_str);
57 if path.is_absolute() {
58 path.to_path_buf()
59 } else {
60 cwd.join(path)
61 }
62}
63
64pub fn read_stdin_lines() -> Vec<String> {
69 let stdin = io::stdin();
70 if stdin.is_terminal() {
71 return Vec::new();
72 }
73
74 #[cfg(unix)]
83 {
84 use std::os::unix::fs::FileTypeExt;
85 use std::os::unix::io::{AsRawFd, FromRawFd};
86 let ft = {
89 let f = std::mem::ManuallyDrop::new(unsafe {
90 std::fs::File::from_raw_fd(stdin.as_raw_fd())
91 });
92 match f.metadata() {
93 Ok(m) => m.file_type(),
94 Err(_) => return Vec::new(),
95 }
96 };
97 if !ft.is_fifo() && !ft.is_file() {
98 return Vec::new();
99 }
100 }
101
102 stdin
103 .lock()
104 .lines()
105 .map_while(|l| l.ok())
106 .filter(|l| !l.trim().is_empty())
107 .map(|l| l.trim().to_string())
108 .collect()
109}
110
111fn classify_line(line: &str, resolved_paths: &ResolvedPaths, cwd: &Path) -> InputLine {
114 let path = Path::new(line);
115 match path.extension().and_then(|e| e.to_str()) {
116 Some("sql") => {
117 let abs = normalize_path(&to_absolute(line, cwd));
118 if is_under_dbt_paths(&abs, resolved_paths) {
119 InputLine::SqlFile(abs)
120 } else {
121 InputLine::Ignore
122 }
123 }
124 Some("yml" | "yaml") => {
125 let abs = normalize_path(&to_absolute(line, cwd));
126 if is_under_dbt_paths(&abs, resolved_paths) {
127 InputLine::YamlFile(abs)
128 } else {
129 InputLine::Ignore
130 }
131 }
132 Some(ext) => {
133 if line.contains('/') || line.contains('\\') || is_common_file_extension(ext) {
139 InputLine::Ignore
140 } else {
141 InputLine::ModelName(line.to_string())
142 }
143 }
144 None => {
145 if line.contains('/') || line.contains('\\') {
148 InputLine::Ignore
149 } else {
150 InputLine::ModelName(line.to_string())
151 }
152 }
153 }
154}
155
156fn is_common_file_extension(ext: &str) -> bool {
166 matches!(
167 ext,
168 "md" | "txt"
169 | "py"
170 | "csv"
171 | "json"
172 | "toml"
173 | "cfg"
174 | "ini"
175 | "rst"
176 | "lock"
177 | "xml"
178 | "html"
179 | "htm"
180 | "js"
181 | "ts"
182 | "sh"
183 | "bat"
184 | "rs"
185 | "go"
186 | "java"
187 | "rb"
188 | "c"
189 | "h"
190 | "cpp"
191 | "hpp"
192 | "swift"
193 | "kt"
194 | "log"
195 | "env"
196 | "gitignore"
197 )
198}
199
200pub fn has_path_like_input(inputs: &[String]) -> bool {
204 inputs.iter().any(|s| {
205 s.contains('/')
206 || s.contains('\\')
207 || s.ends_with(".sql")
208 || s.ends_with(".yml")
209 || s.ends_with(".yaml")
210 })
211}
212
213fn is_under_dbt_paths(abs_path: &Path, resolved_paths: &ResolvedPaths) -> bool {
215 let abs_path = normalize_path(abs_path);
216 let all_paths = resolved_paths
217 .model_paths
218 .iter()
219 .chain(&resolved_paths.seed_paths)
220 .chain(&resolved_paths.snapshot_paths)
221 .chain(&resolved_paths.test_paths)
222 .chain(&resolved_paths.analysis_paths);
223
224 all_paths.into_iter().any(|dir| abs_path.starts_with(dir))
225}
226
227fn resolve_sql_to_label(
229 abs_path: &Path,
230 graph: &LineageGraph,
231 project_dir: &Path,
232) -> Option<String> {
233 let abs_path = normalize_path(abs_path);
234 let project_dir = normalize_path(project_dir);
235 let relative = abs_path.strip_prefix(&project_dir).ok()?;
236 let rel_str = relative.to_slash_lossy();
238
239 graph.node_indices().find_map(|idx| {
240 let node = &graph[idx];
241 match &node.file_path {
242 Some(node_path) => {
243 let node_str = node_path.to_slash_lossy();
244 if node_str == rel_str {
245 Some(node.label.clone())
246 } else {
247 None
248 }
249 }
250 None => None,
251 }
252 })
253}
254
255fn expand_yaml_names(abs_path: &Path) -> Vec<String> {
257 let content = match std::fs::read_to_string(abs_path) {
258 Ok(c) => c,
259 Err(e) => {
260 crate::warn!("could not read {}: {}", abs_path.display(), e);
261 return Vec::new();
262 }
263 };
264
265 let schema = match yaml_schema::parse_schema_file(&content, Some(abs_path)) {
266 Ok(s) => s,
267 Err(e) => {
268 crate::warn!("could not parse {}: {}", abs_path.display(), e);
269 return Vec::new();
270 }
271 };
272
273 let mut names = Vec::new();
274 for source in &schema.sources {
275 for table in &source.tables {
276 names.push(format!("{}.{}", source.name, table.name));
277 }
278 }
279 for model in &schema.models {
280 names.push(model.name.clone());
281 }
282 names
283}
284
285pub fn resolve_stdin_inputs(
292 lines: &[String],
293 graph: &LineageGraph,
294 resolved_paths: &ResolvedPaths,
295 project_dir: &Path,
296 cwd: &Path,
297) -> Vec<String> {
298 let mut seen = std::collections::HashSet::new();
299 let mut names = Vec::new();
300
301 for line in lines {
302 match classify_line(line, resolved_paths, cwd) {
303 InputLine::SqlFile(abs_path) => {
304 if let Some(label) = resolve_sql_to_label(&abs_path, graph, project_dir) {
305 if seen.insert(label.clone()) {
306 names.push(label);
307 }
308 } else {
309 crate::warn!("no node found for file {}, skipping.", abs_path.display());
310 }
311 }
312 InputLine::YamlFile(abs_path) => {
313 for name in expand_yaml_names(&abs_path) {
314 if seen.insert(name.clone()) {
315 names.push(name);
316 }
317 }
318 }
319 InputLine::ModelName(name) => {
320 if seen.insert(name.clone()) {
321 names.push(name);
322 }
323 }
324 InputLine::Ignore => {}
325 }
326 }
327
328 names
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334 use crate::graph::types::{NodeData, NodeType};
335 use std::fs;
336
337 fn make_resolved_paths(project_dir: &Path) -> ResolvedPaths {
338 let norm = |name: &str| vec![normalize_path(&project_dir.join(name))];
339 ResolvedPaths {
340 model_paths: norm("models"),
341 seed_paths: norm("seeds"),
342 snapshot_paths: norm("snapshots"),
343 test_paths: norm("tests"),
344 macro_paths: norm("macros"),
345 analysis_paths: norm("analyses"),
346 }
347 }
348
349 fn make_node(unique_id: &str, label: &str, node_type: NodeType) -> NodeData {
350 NodeData {
351 unique_id: unique_id.to_string(),
352 label: label.to_string(),
353 node_type,
354 file_path: None,
355 description: None,
356 materialization: None,
357 tags: vec![],
358 columns: vec![],
359 exposure: None,
360 }
361 }
362
363 #[test]
368 fn test_classify_sql_under_models() {
369 let tmp = tempfile::tempdir().unwrap();
370 let paths = make_resolved_paths(tmp.path());
371 let result = classify_line("models/staging/stg_orders.sql", &paths, tmp.path());
372 assert!(matches!(result, InputLine::SqlFile(_)));
373 }
374
375 #[test]
376 fn test_classify_sql_under_snapshots() {
377 let tmp = tempfile::tempdir().unwrap();
378 let paths = make_resolved_paths(tmp.path());
379 let result = classify_line("snapshots/snap_orders.sql", &paths, tmp.path());
380 assert!(matches!(result, InputLine::SqlFile(_)));
381 }
382
383 #[test]
384 fn test_classify_sql_under_analyses() {
385 let tmp = tempfile::tempdir().unwrap();
386 let paths = make_resolved_paths(tmp.path());
387 let result = classify_line("analyses/my_analysis.sql", &paths, tmp.path());
388 assert!(matches!(result, InputLine::SqlFile(_)));
389 }
390
391 #[test]
392 fn test_classify_sql_outside_dbt_paths() {
393 let tmp = tempfile::tempdir().unwrap();
394 let paths = make_resolved_paths(tmp.path());
395 let result = classify_line("other/script.sql", &paths, tmp.path());
396 assert!(matches!(result, InputLine::Ignore));
397 }
398
399 #[test]
400 fn test_classify_yml_under_models() {
401 let tmp = tempfile::tempdir().unwrap();
402 let paths = make_resolved_paths(tmp.path());
403 let result = classify_line("models/staging/schema.yml", &paths, tmp.path());
404 assert!(matches!(result, InputLine::YamlFile(_)));
405 }
406
407 #[test]
408 fn test_classify_yaml_under_models() {
409 let tmp = tempfile::tempdir().unwrap();
410 let paths = make_resolved_paths(tmp.path());
411 let result = classify_line("models/schema.yaml", &paths, tmp.path());
412 assert!(matches!(result, InputLine::YamlFile(_)));
413 }
414
415 #[test]
416 fn test_classify_yml_outside_dbt_paths() {
417 let tmp = tempfile::tempdir().unwrap();
418 let paths = make_resolved_paths(tmp.path());
419 let result = classify_line(".github/workflows/ci.yml", &paths, tmp.path());
420 assert!(matches!(result, InputLine::Ignore));
421 }
422
423 #[test]
424 fn test_classify_non_dbt_extension_with_separator() {
425 let tmp = tempfile::tempdir().unwrap();
426 let paths = make_resolved_paths(tmp.path());
427 assert!(matches!(
429 classify_line("seeds/data.csv", &paths, tmp.path()),
430 InputLine::Ignore
431 ));
432 assert!(matches!(
433 classify_line("models/model.py", &paths, tmp.path()),
434 InputLine::Ignore
435 ));
436 }
437
438 #[test]
439 fn test_classify_non_dbt_extension_without_separator() {
440 let tmp = tempfile::tempdir().unwrap();
441 let paths = make_resolved_paths(tmp.path());
442 assert!(matches!(
444 classify_line("README.md", &paths, tmp.path()),
445 InputLine::Ignore
446 ));
447 assert!(matches!(
448 classify_line("Cargo.toml", &paths, tmp.path()),
449 InputLine::Ignore
450 ));
451 assert!(matches!(
452 classify_line("setup.py", &paths, tmp.path()),
453 InputLine::Ignore
454 ));
455 }
456
457 #[test]
458 fn test_classify_no_extension() {
459 let tmp = tempfile::tempdir().unwrap();
460 let paths = make_resolved_paths(tmp.path());
461 let result = classify_line("stg_orders", &paths, tmp.path());
462 assert!(matches!(result, InputLine::ModelName(ref n) if n == "stg_orders"));
463 }
464
465 #[test]
466 fn test_classify_source_name() {
467 let tmp = tempfile::tempdir().unwrap();
468 let paths = make_resolved_paths(tmp.path());
469 let result = classify_line("raw.orders", &paths, tmp.path());
471 assert!(matches!(result, InputLine::ModelName(ref n) if n == "raw.orders"));
472 }
473
474 #[test]
477 fn test_is_under_dbt_paths_nested() {
478 let tmp = tempfile::tempdir().unwrap();
479 let paths = make_resolved_paths(tmp.path());
480 let abs = tmp.path().join("models/staging/stg_orders.sql");
481 assert!(is_under_dbt_paths(&abs, &paths));
482 }
483
484 #[test]
485 fn test_is_under_dbt_paths_absolute() {
486 let tmp = tempfile::tempdir().unwrap();
487 let paths = make_resolved_paths(tmp.path());
488 let abs = tmp.path().join("models/orders.sql");
489 assert!(is_under_dbt_paths(&abs, &paths));
490 }
491
492 #[test]
493 fn test_is_not_under_dbt_paths() {
494 let tmp = tempfile::tempdir().unwrap();
495 let paths = make_resolved_paths(tmp.path());
496 let abs = tmp.path().join("other/file.sql");
497 assert!(!is_under_dbt_paths(&abs, &paths));
498 }
499
500 #[test]
503 fn test_resolve_sql_to_label_found() {
504 let project_dir = Path::new("/project");
505 let mut graph = LineageGraph::new();
506 let mut node = make_node("model.stg_orders", "stg_orders", NodeType::Model);
507 node.file_path = Some(PathBuf::from("models/staging/stg_orders.sql"));
508 graph.add_node(node);
509
510 let abs = Path::new("/project/models/staging/stg_orders.sql");
511 let result = resolve_sql_to_label(abs, &graph, project_dir);
512 assert_eq!(result, Some("stg_orders".to_string()));
513 }
514
515 #[test]
516 fn test_resolve_sql_to_label_not_found() {
517 let project_dir = Path::new("/project");
518 let graph = LineageGraph::new();
519
520 let abs = Path::new("/project/models/nonexistent.sql");
521 let result = resolve_sql_to_label(abs, &graph, project_dir);
522 assert_eq!(result, None);
523 }
524
525 #[test]
528 fn test_expand_yaml_sources() {
529 let tmp = tempfile::tempdir().unwrap();
530 let yaml_path = tmp.path().join("schema.yml");
531 fs::write(
532 &yaml_path,
533 r#"
534sources:
535 - name: raw
536 tables:
537 - name: orders
538 - name: customers
539"#,
540 )
541 .unwrap();
542
543 let names = expand_yaml_names(&yaml_path);
544 assert_eq!(names, vec!["raw.orders", "raw.customers"]);
545 }
546
547 #[test]
548 fn test_expand_yaml_models() {
549 let tmp = tempfile::tempdir().unwrap();
550 let yaml_path = tmp.path().join("schema.yml");
551 fs::write(
552 &yaml_path,
553 r#"
554models:
555 - name: stg_orders
556 - name: stg_customers
557"#,
558 )
559 .unwrap();
560
561 let names = expand_yaml_names(&yaml_path);
562 assert_eq!(names, vec!["stg_orders", "stg_customers"]);
563 }
564
565 #[test]
566 fn test_expand_yaml_mixed() {
567 let tmp = tempfile::tempdir().unwrap();
568 let yaml_path = tmp.path().join("schema.yml");
569 fs::write(
570 &yaml_path,
571 r#"
572sources:
573 - name: raw
574 tables:
575 - name: orders
576models:
577 - name: stg_orders
578"#,
579 )
580 .unwrap();
581
582 let names = expand_yaml_names(&yaml_path);
583 assert_eq!(names, vec!["raw.orders", "stg_orders"]);
584 }
585
586 #[test]
587 fn test_expand_yaml_file_not_found() {
588 let names = expand_yaml_names(Path::new("/nonexistent/schema.yml"));
589 assert!(names.is_empty());
590 }
591
592 #[test]
593 fn test_expand_yaml_empty_file() {
594 let tmp = tempfile::tempdir().unwrap();
595 let yaml_path = tmp.path().join("schema.yml");
596 fs::write(&yaml_path, "").unwrap();
597
598 let names = expand_yaml_names(&yaml_path);
599 assert!(names.is_empty());
600 }
601
602 #[test]
605 fn test_has_path_like_input_with_paths() {
606 assert!(has_path_like_input(&["models/foo.sql".into()]));
607 assert!(has_path_like_input(&[
608 "stg_orders".into(),
609 "models/bar.yml".into()
610 ]));
611 assert!(has_path_like_input(&["schema.yaml".into()]));
612 }
613
614 #[test]
615 fn test_has_path_like_input_model_names_only() {
616 assert!(!has_path_like_input(&["stg_orders".into()]));
617 assert!(!has_path_like_input(&[
618 "raw.orders".into(),
619 "customers".into()
620 ]));
621 }
622
623 #[test]
626 fn test_resolve_stdin_model_name() {
627 let tmp = tempfile::tempdir().unwrap();
628 let paths = make_resolved_paths(tmp.path());
629 let graph = LineageGraph::new();
630
631 let lines = vec!["stg_orders".to_string()];
632 let result = resolve_stdin_inputs(&lines, &graph, &paths, tmp.path(), tmp.path());
633 assert_eq!(result, vec!["stg_orders"]);
634 }
635
636 #[test]
637 fn test_resolve_stdin_ignores_non_dbt() {
638 let tmp = tempfile::tempdir().unwrap();
639 let paths = make_resolved_paths(tmp.path());
640 let graph = LineageGraph::new();
641
642 let lines = vec!["docs/README.md".to_string(), "seeds/data.csv".to_string()];
644 let result = resolve_stdin_inputs(&lines, &graph, &paths, tmp.path(), tmp.path());
645 assert!(result.is_empty());
646 }
647
648 #[test]
649 fn test_resolve_stdin_deduplicates() {
650 let tmp = tempfile::tempdir().unwrap();
651 let paths = make_resolved_paths(tmp.path());
652 let mut graph = LineageGraph::new();
653 let mut node = make_node("model.stg_orders", "stg_orders", NodeType::Model);
654 node.file_path = Some(PathBuf::from("models/stg_orders.sql"));
655 graph.add_node(node);
656
657 let models_dir = tmp.path().join("models");
659 fs::create_dir_all(&models_dir).unwrap();
660 let lines = vec![
661 "models/stg_orders.sql".to_string(),
662 "stg_orders".to_string(),
663 ];
664 let result = resolve_stdin_inputs(&lines, &graph, &paths, tmp.path(), tmp.path());
665 assert_eq!(result, vec!["stg_orders"]);
666 }
667
668 #[test]
669 fn test_resolve_stdin_ignores_root_files() {
670 let tmp = tempfile::tempdir().unwrap();
671 let paths = make_resolved_paths(tmp.path());
672 let graph = LineageGraph::new();
673
674 let lines = vec![
676 "README.md".to_string(),
677 "Cargo.toml".to_string(),
678 "stg_orders".to_string(),
679 ];
680 let result = resolve_stdin_inputs(&lines, &graph, &paths, tmp.path(), tmp.path());
681 assert_eq!(result, vec!["stg_orders"]);
682 }
683
684 #[test]
687 fn test_classify_and_resolve_sql() {
688 let tmp = tempfile::tempdir().unwrap();
689 let paths = make_resolved_paths(tmp.path());
690 let mut graph = LineageGraph::new();
691 let mut node = make_node("model.stg_orders", "stg_orders", NodeType::Model);
692 node.file_path = Some(PathBuf::from("models/staging/stg_orders.sql"));
693 graph.add_node(node);
694
695 let line = "models/staging/stg_orders.sql";
697 match classify_line(line, &paths, tmp.path()) {
698 InputLine::SqlFile(abs_path) => {
699 let label = resolve_sql_to_label(&abs_path, &graph, tmp.path());
700 assert_eq!(label, Some("stg_orders".to_string()));
701 }
702 other => panic!("Expected SqlFile, got {:?}", std::mem::discriminant(&other)),
703 }
704 }
705
706 #[test]
707 fn test_classify_and_resolve_yaml() {
708 let tmp = tempfile::tempdir().unwrap();
709 let models_dir = tmp.path().join("models");
710 fs::create_dir_all(&models_dir).unwrap();
711 fs::write(
712 models_dir.join("schema.yml"),
713 "sources:\n - name: raw\n tables:\n - name: orders\n",
714 )
715 .unwrap();
716
717 let paths = make_resolved_paths(tmp.path());
718
719 let line = "models/schema.yml";
720 match classify_line(line, &paths, tmp.path()) {
721 InputLine::YamlFile(abs_path) => {
722 let names = expand_yaml_names(&abs_path);
723 assert_eq!(names, vec!["raw.orders"]);
724 }
725 other => panic!(
726 "Expected YamlFile, got {:?}",
727 std::mem::discriminant(&other)
728 ),
729 }
730 }
731
732 #[test]
733 fn test_classify_and_resolve_mixed() {
734 let tmp = tempfile::tempdir().unwrap();
735 let models_dir = tmp.path().join("models");
736 fs::create_dir_all(models_dir.join("staging")).unwrap();
737 fs::write(
738 models_dir.join("schema.yml"),
739 "sources:\n - name: raw\n tables:\n - name: orders\n",
740 )
741 .unwrap();
742
743 let paths = make_resolved_paths(tmp.path());
744 let mut graph = LineageGraph::new();
745 let mut node = make_node("model.stg_orders", "stg_orders", NodeType::Model);
746 node.file_path = Some(PathBuf::from("models/staging/stg_orders.sql"));
747 graph.add_node(node);
748
749 let inputs = vec![
750 "models/staging/stg_orders.sql",
751 "models/schema.yml",
752 "raw.customers",
753 ".github/workflows/ci.yml",
754 "docs/README.md",
755 ];
756
757 let mut result = Vec::new();
758 for line in inputs {
759 match classify_line(line, &paths, tmp.path()) {
760 InputLine::SqlFile(abs) => {
761 if let Some(label) = resolve_sql_to_label(&abs, &graph, tmp.path()) {
762 result.push(label);
763 }
764 }
765 InputLine::YamlFile(abs) => {
766 result.extend(expand_yaml_names(&abs));
767 }
768 InputLine::ModelName(name) => result.push(name),
769 InputLine::Ignore => {}
770 }
771 }
772 assert_eq!(result, vec!["stg_orders", "raw.orders", "raw.customers"]);
773 }
774
775 #[test]
776 fn test_subdir_project_path_resolution() {
777 let tmp = tempfile::tempdir().unwrap();
779 let dbt_dir = tmp.path().join("dbt");
780 let models_dir = dbt_dir.join("models");
781 fs::create_dir_all(&models_dir).unwrap();
782
783 let paths = make_resolved_paths(&dbt_dir);
785
786 let mut graph = LineageGraph::new();
787 let mut node = make_node("model.stg_orders", "stg_orders", NodeType::Model);
788 node.file_path = Some(PathBuf::from("models/stg_orders.sql"));
790 graph.add_node(node);
791
792 let line = "dbt/models/stg_orders.sql";
794 match classify_line(line, &paths, tmp.path()) {
796 InputLine::SqlFile(abs_path) => {
797 let label = resolve_sql_to_label(&abs_path, &graph, &dbt_dir);
800 assert_eq!(label, Some("stg_orders".to_string()));
801 }
802 other => panic!("Expected SqlFile, got {:?}", std::mem::discriminant(&other)),
803 }
804 }
805
806 #[cfg(unix)]
811 #[test]
812 fn test_dev_null_is_not_fifo_or_file() {
813 use std::os::unix::fs::FileTypeExt;
814
815 let f = std::fs::File::open("/dev/null").unwrap();
816 let ft = f.metadata().unwrap().file_type();
817 assert!(!ft.is_fifo());
819 assert!(!ft.is_file());
820 }
821
822 #[cfg(unix)]
824 #[test]
825 fn test_regular_file_is_file() {
826 let tmp = tempfile::NamedTempFile::new().unwrap();
827 let ft = tmp.as_file().metadata().unwrap().file_type();
828 assert!(ft.is_file());
829 }
830}