use std::io::{self, BufRead, IsTerminal};
use std::path::{Path, PathBuf};
use path_slash::PathExt as _;
use crate::graph::types::LineageGraph;
use crate::parser::project::ResolvedPaths;
use crate::parser::yaml_schema;
enum InputLine {
SqlFile(PathBuf),
YamlFile(PathBuf),
ModelName(String),
Ignore,
}
pub(crate) fn normalize_path(path: &Path) -> PathBuf {
if let Ok(canonical) = path.canonicalize() {
return canonical;
}
let mut components_to_append = Vec::new();
let mut current = path.to_path_buf();
loop {
if let Ok(canonical) = current.canonicalize() {
let mut result = canonical;
for component in components_to_append.into_iter().rev() {
result.push(component);
}
return result;
}
if let Some(file_name) = current.file_name() {
components_to_append.push(file_name.to_owned());
}
if !current.pop() {
break;
}
}
path.to_path_buf()
}
fn to_absolute(path_str: &str, cwd: &Path) -> PathBuf {
let path = Path::new(path_str);
if path.is_absolute() {
path.to_path_buf()
} else {
cwd.join(path)
}
}
pub fn read_stdin_lines() -> Vec<String> {
let stdin = io::stdin();
if stdin.is_terminal() {
return Vec::new();
}
#[cfg(unix)]
{
use std::os::unix::fs::FileTypeExt;
use std::os::unix::io::{AsRawFd, FromRawFd};
let ft = {
let f = std::mem::ManuallyDrop::new(unsafe {
std::fs::File::from_raw_fd(stdin.as_raw_fd())
});
match f.metadata() {
Ok(m) => m.file_type(),
Err(_) => return Vec::new(),
}
};
if !ft.is_fifo() && !ft.is_file() {
return Vec::new();
}
}
stdin
.lock()
.lines()
.map_while(|l| l.ok())
.filter(|l| !l.trim().is_empty())
.map(|l| l.trim().to_string())
.collect()
}
fn classify_line(line: &str, resolved_paths: &ResolvedPaths, cwd: &Path) -> InputLine {
let path = Path::new(line);
match path.extension().and_then(|e| e.to_str()) {
Some("sql") => {
let abs = normalize_path(&to_absolute(line, cwd));
if is_under_dbt_paths(&abs, resolved_paths) {
InputLine::SqlFile(abs)
} else {
InputLine::Ignore
}
}
Some("yml" | "yaml") => {
let abs = normalize_path(&to_absolute(line, cwd));
if is_under_dbt_paths(&abs, resolved_paths) {
InputLine::YamlFile(abs)
} else {
InputLine::Ignore
}
}
Some(ext) => {
if line.contains('/') || line.contains('\\') || is_common_file_extension(ext) {
InputLine::Ignore
} else {
InputLine::ModelName(line.to_string())
}
}
None => {
if line.contains('/') || line.contains('\\') {
InputLine::Ignore
} else {
InputLine::ModelName(line.to_string())
}
}
}
}
fn is_common_file_extension(ext: &str) -> bool {
matches!(
ext,
"md" | "txt"
| "py"
| "csv"
| "json"
| "toml"
| "cfg"
| "ini"
| "rst"
| "lock"
| "xml"
| "html"
| "htm"
| "js"
| "ts"
| "sh"
| "bat"
| "rs"
| "go"
| "java"
| "rb"
| "c"
| "h"
| "cpp"
| "hpp"
| "swift"
| "kt"
| "log"
| "env"
| "gitignore"
)
}
pub fn has_path_like_input(inputs: &[String]) -> bool {
inputs.iter().any(|s| {
s.contains('/')
|| s.contains('\\')
|| s.ends_with(".sql")
|| s.ends_with(".yml")
|| s.ends_with(".yaml")
})
}
fn is_under_dbt_paths(abs_path: &Path, resolved_paths: &ResolvedPaths) -> bool {
let abs_path = normalize_path(abs_path);
let all_paths = resolved_paths
.model_paths
.iter()
.chain(&resolved_paths.seed_paths)
.chain(&resolved_paths.snapshot_paths)
.chain(&resolved_paths.test_paths)
.chain(&resolved_paths.analysis_paths);
all_paths.into_iter().any(|dir| abs_path.starts_with(dir))
}
fn resolve_sql_to_label(
abs_path: &Path,
graph: &LineageGraph,
project_dir: &Path,
) -> Option<String> {
let abs_path = normalize_path(abs_path);
let project_dir = normalize_path(project_dir);
let relative = abs_path.strip_prefix(&project_dir).ok()?;
let rel_str = relative.to_slash_lossy();
graph.node_indices().find_map(|idx| {
let node = &graph[idx];
match &node.file_path {
Some(node_path) => {
let node_str = node_path.to_slash_lossy();
if node_str == rel_str {
Some(node.label.clone())
} else {
None
}
}
None => None,
}
})
}
fn expand_yaml_names(abs_path: &Path) -> Vec<String> {
let content = match std::fs::read_to_string(abs_path) {
Ok(c) => c,
Err(e) => {
crate::warn!("could not read {}: {}", abs_path.display(), e);
return Vec::new();
}
};
let schema = match yaml_schema::parse_schema_file(&content, Some(abs_path)) {
Ok(s) => s,
Err(e) => {
crate::warn!("could not parse {}: {}", abs_path.display(), e);
return Vec::new();
}
};
let mut names = Vec::new();
for source in &schema.sources {
for table in &source.tables {
names.push(format!("{}.{}", source.name, table.name));
}
}
for model in &schema.models {
names.push(model.name.clone());
}
names
}
pub fn resolve_stdin_inputs(
lines: &[String],
graph: &LineageGraph,
resolved_paths: &ResolvedPaths,
project_dir: &Path,
cwd: &Path,
) -> Vec<String> {
let mut seen = std::collections::HashSet::new();
let mut names = Vec::new();
for line in lines {
match classify_line(line, resolved_paths, cwd) {
InputLine::SqlFile(abs_path) => {
if let Some(label) = resolve_sql_to_label(&abs_path, graph, project_dir) {
if seen.insert(label.clone()) {
names.push(label);
}
} else {
crate::warn!("no node found for file {}, skipping.", abs_path.display());
}
}
InputLine::YamlFile(abs_path) => {
for name in expand_yaml_names(&abs_path) {
if seen.insert(name.clone()) {
names.push(name);
}
}
}
InputLine::ModelName(name) => {
if seen.insert(name.clone()) {
names.push(name);
}
}
InputLine::Ignore => {}
}
}
names
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::types::{NodeData, NodeType};
use std::fs;
fn make_resolved_paths(project_dir: &Path) -> ResolvedPaths {
let norm = |name: &str| vec![normalize_path(&project_dir.join(name))];
ResolvedPaths {
model_paths: norm("models"),
seed_paths: norm("seeds"),
snapshot_paths: norm("snapshots"),
test_paths: norm("tests"),
macro_paths: norm("macros"),
analysis_paths: norm("analyses"),
}
}
fn make_node(unique_id: &str, label: &str, node_type: NodeType) -> NodeData {
NodeData {
unique_id: unique_id.to_string(),
label: label.to_string(),
node_type,
file_path: None,
description: None,
materialization: None,
tags: vec![],
columns: vec![],
exposure: None,
}
}
#[test]
fn test_classify_sql_under_models() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let result = classify_line("models/staging/stg_orders.sql", &paths, tmp.path());
assert!(matches!(result, InputLine::SqlFile(_)));
}
#[test]
fn test_classify_sql_under_snapshots() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let result = classify_line("snapshots/snap_orders.sql", &paths, tmp.path());
assert!(matches!(result, InputLine::SqlFile(_)));
}
#[test]
fn test_classify_sql_under_analyses() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let result = classify_line("analyses/my_analysis.sql", &paths, tmp.path());
assert!(matches!(result, InputLine::SqlFile(_)));
}
#[test]
fn test_classify_sql_outside_dbt_paths() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let result = classify_line("other/script.sql", &paths, tmp.path());
assert!(matches!(result, InputLine::Ignore));
}
#[test]
fn test_classify_yml_under_models() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let result = classify_line("models/staging/schema.yml", &paths, tmp.path());
assert!(matches!(result, InputLine::YamlFile(_)));
}
#[test]
fn test_classify_yaml_under_models() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let result = classify_line("models/schema.yaml", &paths, tmp.path());
assert!(matches!(result, InputLine::YamlFile(_)));
}
#[test]
fn test_classify_yml_outside_dbt_paths() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let result = classify_line(".github/workflows/ci.yml", &paths, tmp.path());
assert!(matches!(result, InputLine::Ignore));
}
#[test]
fn test_classify_non_dbt_extension_with_separator() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
assert!(matches!(
classify_line("seeds/data.csv", &paths, tmp.path()),
InputLine::Ignore
));
assert!(matches!(
classify_line("models/model.py", &paths, tmp.path()),
InputLine::Ignore
));
}
#[test]
fn test_classify_non_dbt_extension_without_separator() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
assert!(matches!(
classify_line("README.md", &paths, tmp.path()),
InputLine::Ignore
));
assert!(matches!(
classify_line("Cargo.toml", &paths, tmp.path()),
InputLine::Ignore
));
assert!(matches!(
classify_line("setup.py", &paths, tmp.path()),
InputLine::Ignore
));
}
#[test]
fn test_classify_no_extension() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let result = classify_line("stg_orders", &paths, tmp.path());
assert!(matches!(result, InputLine::ModelName(ref n) if n == "stg_orders"));
}
#[test]
fn test_classify_source_name() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let result = classify_line("raw.orders", &paths, tmp.path());
assert!(matches!(result, InputLine::ModelName(ref n) if n == "raw.orders"));
}
#[test]
fn test_is_under_dbt_paths_nested() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let abs = tmp.path().join("models/staging/stg_orders.sql");
assert!(is_under_dbt_paths(&abs, &paths));
}
#[test]
fn test_is_under_dbt_paths_absolute() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let abs = tmp.path().join("models/orders.sql");
assert!(is_under_dbt_paths(&abs, &paths));
}
#[test]
fn test_is_not_under_dbt_paths() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let abs = tmp.path().join("other/file.sql");
assert!(!is_under_dbt_paths(&abs, &paths));
}
#[test]
fn test_resolve_sql_to_label_found() {
let project_dir = Path::new("/project");
let mut graph = LineageGraph::new();
let mut node = make_node("model.stg_orders", "stg_orders", NodeType::Model);
node.file_path = Some(PathBuf::from("models/staging/stg_orders.sql"));
graph.add_node(node);
let abs = Path::new("/project/models/staging/stg_orders.sql");
let result = resolve_sql_to_label(abs, &graph, project_dir);
assert_eq!(result, Some("stg_orders".to_string()));
}
#[test]
fn test_resolve_sql_to_label_not_found() {
let project_dir = Path::new("/project");
let graph = LineageGraph::new();
let abs = Path::new("/project/models/nonexistent.sql");
let result = resolve_sql_to_label(abs, &graph, project_dir);
assert_eq!(result, None);
}
#[test]
fn test_expand_yaml_sources() {
let tmp = tempfile::tempdir().unwrap();
let yaml_path = tmp.path().join("schema.yml");
fs::write(
&yaml_path,
r#"
sources:
- name: raw
tables:
- name: orders
- name: customers
"#,
)
.unwrap();
let names = expand_yaml_names(&yaml_path);
assert_eq!(names, vec!["raw.orders", "raw.customers"]);
}
#[test]
fn test_expand_yaml_models() {
let tmp = tempfile::tempdir().unwrap();
let yaml_path = tmp.path().join("schema.yml");
fs::write(
&yaml_path,
r#"
models:
- name: stg_orders
- name: stg_customers
"#,
)
.unwrap();
let names = expand_yaml_names(&yaml_path);
assert_eq!(names, vec!["stg_orders", "stg_customers"]);
}
#[test]
fn test_expand_yaml_mixed() {
let tmp = tempfile::tempdir().unwrap();
let yaml_path = tmp.path().join("schema.yml");
fs::write(
&yaml_path,
r#"
sources:
- name: raw
tables:
- name: orders
models:
- name: stg_orders
"#,
)
.unwrap();
let names = expand_yaml_names(&yaml_path);
assert_eq!(names, vec!["raw.orders", "stg_orders"]);
}
#[test]
fn test_expand_yaml_file_not_found() {
let names = expand_yaml_names(Path::new("/nonexistent/schema.yml"));
assert!(names.is_empty());
}
#[test]
fn test_expand_yaml_empty_file() {
let tmp = tempfile::tempdir().unwrap();
let yaml_path = tmp.path().join("schema.yml");
fs::write(&yaml_path, "").unwrap();
let names = expand_yaml_names(&yaml_path);
assert!(names.is_empty());
}
#[test]
fn test_has_path_like_input_with_paths() {
assert!(has_path_like_input(&["models/foo.sql".into()]));
assert!(has_path_like_input(&[
"stg_orders".into(),
"models/bar.yml".into()
]));
assert!(has_path_like_input(&["schema.yaml".into()]));
}
#[test]
fn test_has_path_like_input_model_names_only() {
assert!(!has_path_like_input(&["stg_orders".into()]));
assert!(!has_path_like_input(&[
"raw.orders".into(),
"customers".into()
]));
}
#[test]
fn test_resolve_stdin_model_name() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let graph = LineageGraph::new();
let lines = vec!["stg_orders".to_string()];
let result = resolve_stdin_inputs(&lines, &graph, &paths, tmp.path(), tmp.path());
assert_eq!(result, vec!["stg_orders"]);
}
#[test]
fn test_resolve_stdin_ignores_non_dbt() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let graph = LineageGraph::new();
let lines = vec!["docs/README.md".to_string(), "seeds/data.csv".to_string()];
let result = resolve_stdin_inputs(&lines, &graph, &paths, tmp.path(), tmp.path());
assert!(result.is_empty());
}
#[test]
fn test_resolve_stdin_deduplicates() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let mut graph = LineageGraph::new();
let mut node = make_node("model.stg_orders", "stg_orders", NodeType::Model);
node.file_path = Some(PathBuf::from("models/stg_orders.sql"));
graph.add_node(node);
let models_dir = tmp.path().join("models");
fs::create_dir_all(&models_dir).unwrap();
let lines = vec![
"models/stg_orders.sql".to_string(),
"stg_orders".to_string(),
];
let result = resolve_stdin_inputs(&lines, &graph, &paths, tmp.path(), tmp.path());
assert_eq!(result, vec!["stg_orders"]);
}
#[test]
fn test_resolve_stdin_ignores_root_files() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let graph = LineageGraph::new();
let lines = vec![
"README.md".to_string(),
"Cargo.toml".to_string(),
"stg_orders".to_string(),
];
let result = resolve_stdin_inputs(&lines, &graph, &paths, tmp.path(), tmp.path());
assert_eq!(result, vec!["stg_orders"]);
}
#[test]
fn test_classify_and_resolve_sql() {
let tmp = tempfile::tempdir().unwrap();
let paths = make_resolved_paths(tmp.path());
let mut graph = LineageGraph::new();
let mut node = make_node("model.stg_orders", "stg_orders", NodeType::Model);
node.file_path = Some(PathBuf::from("models/staging/stg_orders.sql"));
graph.add_node(node);
let line = "models/staging/stg_orders.sql";
match classify_line(line, &paths, tmp.path()) {
InputLine::SqlFile(abs_path) => {
let label = resolve_sql_to_label(&abs_path, &graph, tmp.path());
assert_eq!(label, Some("stg_orders".to_string()));
}
other => panic!("Expected SqlFile, got {:?}", std::mem::discriminant(&other)),
}
}
#[test]
fn test_classify_and_resolve_yaml() {
let tmp = tempfile::tempdir().unwrap();
let models_dir = tmp.path().join("models");
fs::create_dir_all(&models_dir).unwrap();
fs::write(
models_dir.join("schema.yml"),
"sources:\n - name: raw\n tables:\n - name: orders\n",
)
.unwrap();
let paths = make_resolved_paths(tmp.path());
let line = "models/schema.yml";
match classify_line(line, &paths, tmp.path()) {
InputLine::YamlFile(abs_path) => {
let names = expand_yaml_names(&abs_path);
assert_eq!(names, vec!["raw.orders"]);
}
other => panic!(
"Expected YamlFile, got {:?}",
std::mem::discriminant(&other)
),
}
}
#[test]
fn test_classify_and_resolve_mixed() {
let tmp = tempfile::tempdir().unwrap();
let models_dir = tmp.path().join("models");
fs::create_dir_all(models_dir.join("staging")).unwrap();
fs::write(
models_dir.join("schema.yml"),
"sources:\n - name: raw\n tables:\n - name: orders\n",
)
.unwrap();
let paths = make_resolved_paths(tmp.path());
let mut graph = LineageGraph::new();
let mut node = make_node("model.stg_orders", "stg_orders", NodeType::Model);
node.file_path = Some(PathBuf::from("models/staging/stg_orders.sql"));
graph.add_node(node);
let inputs = vec![
"models/staging/stg_orders.sql",
"models/schema.yml",
"raw.customers",
".github/workflows/ci.yml",
"docs/README.md",
];
let mut result = Vec::new();
for line in inputs {
match classify_line(line, &paths, tmp.path()) {
InputLine::SqlFile(abs) => {
if let Some(label) = resolve_sql_to_label(&abs, &graph, tmp.path()) {
result.push(label);
}
}
InputLine::YamlFile(abs) => {
result.extend(expand_yaml_names(&abs));
}
InputLine::ModelName(name) => result.push(name),
InputLine::Ignore => {}
}
}
assert_eq!(result, vec!["stg_orders", "raw.orders", "raw.customers"]);
}
#[test]
fn test_subdir_project_path_resolution() {
let tmp = tempfile::tempdir().unwrap();
let dbt_dir = tmp.path().join("dbt");
let models_dir = dbt_dir.join("models");
fs::create_dir_all(&models_dir).unwrap();
let paths = make_resolved_paths(&dbt_dir);
let mut graph = LineageGraph::new();
let mut node = make_node("model.stg_orders", "stg_orders", NodeType::Model);
node.file_path = Some(PathBuf::from("models/stg_orders.sql"));
graph.add_node(node);
let line = "dbt/models/stg_orders.sql";
match classify_line(line, &paths, tmp.path()) {
InputLine::SqlFile(abs_path) => {
let label = resolve_sql_to_label(&abs_path, &graph, &dbt_dir);
assert_eq!(label, Some("stg_orders".to_string()));
}
other => panic!("Expected SqlFile, got {:?}", std::mem::discriminant(&other)),
}
}
#[cfg(unix)]
#[test]
fn test_dev_null_is_not_fifo_or_file() {
use std::os::unix::fs::FileTypeExt;
let f = std::fs::File::open("/dev/null").unwrap();
let ft = f.metadata().unwrap().file_type();
assert!(!ft.is_fifo());
assert!(!ft.is_file());
}
#[cfg(unix)]
#[test]
fn test_regular_file_is_file() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let ft = tmp.as_file().metadata().unwrap().file_type();
assert!(ft.is_file());
}
}