dlin-core 0.2.0

Core library for dbt model lineage analysis
Documentation
use std::collections::HashSet;

use polyglot_sql::{DialectType, Schema};

use crate::parser::cache::hash_str;
use crate::parser::manifest::Manifest;

fn make_fq_table_name(database: Option<&str>, schema: Option<&str>, name: &str) -> String {
    match (database, schema) {
        (Some(db), Some(s)) => format!("{}.{}.{}", db, s, name),
        (None, Some(s)) => format!("{}.{}", s, name),
        _ => name.to_string(),
    }
}

pub(super) fn build_schema_from_manifest(
    manifest: &Manifest,
    node: &crate::parser::manifest::ManifestNode,
    dialect: DialectType,
) -> Option<polyglot_sql::MappingSchema> {
    let mut schema = polyglot_sql::MappingSchema::new();
    let mut has_entries = false;

    for dep_id in &node.depends_on.nodes {
        if let Some(dep_node) = manifest.nodes.get(dep_id) {
            let col_names = resolve_node_columns(dep_node, manifest, dialect);
            if !col_names.is_empty() {
                let cols: Vec<(String, polyglot_sql::expressions::DataType)> = col_names
                    .iter()
                    .map(|name| (name.clone(), polyglot_sql::expressions::DataType::Unknown))
                    .collect();

                let fq_name = make_fq_table_name(
                    dep_node.database.as_deref(),
                    dep_node.schema.as_deref(),
                    &dep_node.name,
                );
                if schema.add_table(&fq_name, &cols, None).is_ok() {
                    has_entries = true;
                }
                if fq_name != dep_node.name {
                    let _ = schema.add_table(&dep_node.name, &cols, None);
                }
            }
            continue;
        }

        if let Some(dep_source) = manifest.sources.get(dep_id)
            && !dep_source.columns.is_empty()
        {
            let mut source_col_names: Vec<&String> = dep_source.columns.keys().collect();
            source_col_names.sort_unstable();
            let cols: Vec<(String, polyglot_sql::expressions::DataType)> = source_col_names
                .into_iter()
                .map(|name| (name.clone(), polyglot_sql::expressions::DataType::Unknown))
                .collect();
            let physical_identifier = dep_source.identifier.as_deref().unwrap_or(&dep_source.name);
            let physical_fq = make_fq_table_name(
                dep_source.database.as_deref(),
                dep_source.schema.as_deref(),
                physical_identifier,
            );
            if schema.add_table(&physical_fq, &cols, None).is_ok() {
                has_entries = true;
            }
            let schema_fq =
                make_fq_table_name(None, dep_source.schema.as_deref(), physical_identifier);
            let source_qualified = format!("{}.{}", dep_source.source_name, dep_source.name);
            for alias in [
                schema_fq.as_str(),
                physical_identifier,
                dep_source.name.as_str(),
                source_qualified.as_str(),
            ] {
                if alias != physical_fq {
                    let _ = schema.add_table(alias, &cols, None);
                }
            }
        }
    }

    if has_entries { Some(schema) } else { None }
}

fn resolve_node_columns(
    dep_node: &crate::parser::manifest::ManifestNode,
    manifest: &Manifest,
    dialect: DialectType,
) -> Vec<String> {
    let yaml_cols: HashSet<String> = dep_node.columns.keys().cloned().collect();
    let inferred_cols: HashSet<String> = dep_node
        .compiled_code
        .as_ref()
        .map(|code| {
            let schema = build_yaml_schema_for_node(manifest, dep_node);
            infer_output_columns(code, dialect, schema.as_ref())
        })
        .unwrap_or_default()
        .into_iter()
        .collect();

    let mut merged: Vec<String> = yaml_cols.union(&inferred_cols).cloned().collect();
    merged.sort_unstable();
    merged
}

pub(super) fn build_yaml_schema_for_node(
    manifest: &Manifest,
    node: &crate::parser::manifest::ManifestNode,
) -> Option<polyglot_sql::MappingSchema> {
    let mut schema = polyglot_sql::MappingSchema::new();
    let mut has_entries = false;

    for dep_id in &node.depends_on.nodes {
        if let Some(dep_node) = manifest.nodes.get(dep_id) {
            if !dep_node.columns.is_empty() {
                let mut node_col_names: Vec<&String> = dep_node.columns.keys().collect();
                node_col_names.sort_unstable();
                let cols: Vec<(String, polyglot_sql::expressions::DataType)> = node_col_names
                    .into_iter()
                    .map(|name| (name.clone(), polyglot_sql::expressions::DataType::Unknown))
                    .collect();
                let fq_name = make_fq_table_name(
                    dep_node.database.as_deref(),
                    dep_node.schema.as_deref(),
                    &dep_node.name,
                );
                if schema.add_table(&fq_name, &cols, None).is_ok() {
                    has_entries = true;
                }
                if fq_name != dep_node.name {
                    let _ = schema.add_table(&dep_node.name, &cols, None);
                }
            }
            continue;
        }

        if let Some(dep_source) = manifest.sources.get(dep_id)
            && !dep_source.columns.is_empty()
        {
            let mut source_col_names: Vec<&String> = dep_source.columns.keys().collect();
            source_col_names.sort_unstable();
            let cols: Vec<(String, polyglot_sql::expressions::DataType)> = source_col_names
                .into_iter()
                .map(|name| (name.clone(), polyglot_sql::expressions::DataType::Unknown))
                .collect();
            let physical_identifier = dep_source.identifier.as_deref().unwrap_or(&dep_source.name);
            let physical_fq = make_fq_table_name(
                dep_source.database.as_deref(),
                dep_source.schema.as_deref(),
                physical_identifier,
            );
            if schema.add_table(&physical_fq, &cols, None).is_ok() {
                has_entries = true;
            }
            let schema_fq =
                make_fq_table_name(None, dep_source.schema.as_deref(), physical_identifier);
            let source_qualified = format!("{}.{}", dep_source.source_name, dep_source.name);
            for alias in [
                schema_fq.as_str(),
                physical_identifier,
                dep_source.name.as_str(),
                source_qualified.as_str(),
            ] {
                if alias != physical_fq {
                    let _ = schema.add_table(alias, &cols, None);
                }
            }
        }
    }

    if has_entries { Some(schema) } else { None }
}

pub(super) fn infer_output_columns(
    sql: &str,
    dialect: DialectType,
    schema: Option<&polyglot_sql::MappingSchema>,
) -> Vec<String> {
    let expr = match polyglot_sql::parse_one(sql, dialect) {
        Ok(e) => e,
        Err(_) => return vec![],
    };
    crate::parser::columns::extract_select_columns_from_expr(
        &expr,
        schema.map(|s| s as &dyn polyglot_sql::Schema),
    )
}

pub(super) fn compute_manifest_columns_hash(
    manifest: &Manifest,
    node: &crate::parser::manifest::ManifestNode,
) -> u64 {
    let mut visited: HashSet<String> = HashSet::new();
    hash_node_columns_transitive(manifest, node, &mut visited)
}

fn hash_node_columns_transitive(
    manifest: &Manifest,
    node: &crate::parser::manifest::ManifestNode,
    visited: &mut HashSet<String>,
) -> u64 {
    let mut parts: Vec<String> = Vec::new();

    let mut own_cols: Vec<&String> = node.columns.keys().collect();
    own_cols.sort();
    for col in own_cols {
        parts.push(col.clone());
    }
    if let Some(code) = &node.compiled_code {
        parts.push(format!("sql:{}", hash_str(code)));
    }
    parts.push("|".to_string());

    let mut dep_ids: Vec<&String> = node.depends_on.nodes.iter().collect();
    dep_ids.sort();
    for dep_id in dep_ids {
        parts.push(dep_id.clone());
        if visited.contains(dep_id) {
            continue;
        }
        visited.insert(dep_id.clone());
        if let Some(dep_node) = manifest.nodes.get(dep_id) {
            let dep_hash = hash_node_columns_transitive(manifest, dep_node, visited);
            parts.push(format!("node:{}", dep_hash));
        } else if let Some(dep_source) = manifest.sources.get(dep_id) {
            let mut cols: Vec<&String> = dep_source.columns.keys().collect();
            cols.sort();
            for col in cols {
                parts.push(col.clone());
            }
            if let Some(db) = &dep_source.database {
                parts.push(format!("db:{}", db));
            }
            if let Some(s) = &dep_source.schema {
                parts.push(format!("schema:{}", s));
            }
            if let Some(id) = &dep_source.identifier {
                parts.push(format!("id:{}", id));
            }
        }
    }

    hash_str(&parts.join("\0"))
}