hinge 0.1.0

SQL-native ELT engine — dependency graph resolved automatically from FROM/JOIN clauses, parallel execution, single binary
Documentation
use crate::domain::{Asset, AssetKind, AssetReference, AssetSource, Graph, GraphError};
use regex::Regex;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::Path;
use std::sync::OnceLock;
use walkdir::WalkDir;

/// Errors that can occur while scanning a models directory and building the graph.
#[derive(Debug, thiserror::Error)]
pub enum BuildGraphError {
    #[error("I/O error: {0}")]
    Io(#[from] std::io::Error),

    #[error("'{0}' is not a directory")]
    NotADirectory(String),

    #[error("cannot determine schema/name from '{0}' (expected …/<schema>/<model>.sql)")]
    InvalidPath(String),

    #[error("empty SQL body in '{0}'")]
    EmptySource(String),

    #[error("graph error: {0}")]
    InvalidGraph(#[from] GraphError),
}

/// Scans a directory of `.sql` files and builds a [`Graph`] with automatic
/// dependency resolution.
///
/// See [`BuildGraph::from_dir`] for the full contract.
pub struct BuildGraph;

impl BuildGraph {
    /// Walk `path` recursively, parse every `.sql` file and return a dependency graph.
    ///
    /// File layout convention:
    /// ```text
    /// <root>/
    ///   <schema>/
    ///     <model>.sql      →  asset reference  = schema.model
    /// ```
    ///
    /// Optional header comment (must be the very first lines):
    /// ```sql
    /// -- @kind: table | view | materialized_view
    /// -- @kind: matview          (alias)
    /// -- @clickhouse.order_by: (event_date, user_id)
    /// -- @clickhouse.partition_by: toYYYYMM(event_date)
    /// SELECT …
    /// ```
    /// Defaults to `View` when the header is absent.
    ///
    /// Dependencies are extracted from every `FROM <schema>.<table>` and
    /// `JOIN <schema>.<table>` pattern found in the file body.
    /// References that don't resolve to another `.sql` file in the same tree
    /// are silently treated as external sources and ignored.
    pub fn from_dir(path: impl AsRef<Path>) -> Result<Graph, BuildGraphError> {
        let path = path.as_ref();

        if !path.is_dir() {
            return Err(BuildGraphError::NotADirectory(path.display().to_string()));
        }

        let mut asset_map: HashMap<AssetReference, Asset> = HashMap::new();
        let mut raw_deps: HashMap<AssetReference, Vec<AssetReference>> = HashMap::new();

        for entry in WalkDir::new(path)
            .into_iter()
            .filter_map(|e| e.ok())
            .filter(|e| {
                e.file_type().is_file()
                    && e.path().extension().map_or(false, |ext| ext == "sql")
            })
        {
            let file_path = entry.path();
            let reference = Self::reference_from_path(file_path)?;
            let content = std::fs::read_to_string(file_path)?;

            let headers = Self::parse_headers(&content);
            let kind = Self::parse_kind(&headers);
            let body = Self::sql_body(&content);
            let source = AssetSource::new(body)
                .map_err(|_| BuildGraphError::EmptySource(file_path.display().to_string()))?;

            let deps = Self::extract_deps(&content);

            asset_map.insert(
                reference.clone(),
                Asset::with_headers(kind, reference.clone(), source, headers),
            );
            raw_deps.insert(reference, deps);
        }

        // Build edges: edges[dep] = [asset, …]
        // "asset depends on dep" → dep's child is asset (dep must run before asset)
        let mut edges: HashMap<Asset, Vec<Asset>> = HashMap::new();

        for asset in asset_map.values() {
            edges.entry(asset.clone()).or_default();
        }

        for (ref_, deps) in &raw_deps {
            if let Some(current) = asset_map.get(ref_) {
                for dep_ref in deps {
                    if let Some(dep_asset) = asset_map.get(dep_ref) {
                        edges
                            .entry(dep_asset.clone())
                            .or_default()
                            .push(current.clone());
                    }
                    // External reference → skip silently
                }
            }
        }

        Ok(Graph::build(edges)?)
    }

    /// `…/staging/orders.sql`  →  `AssetReference { schema: "staging", name: "orders" }`
    fn reference_from_path(path: &Path) -> Result<AssetReference, BuildGraphError> {
        let name = path
            .file_stem()
            .and_then(|s| s.to_str())
            .ok_or_else(|| BuildGraphError::InvalidPath(path.display().to_string()))?;

        let schema = path
            .parent()
            .and_then(|p| p.file_name())
            .and_then(|s| s.to_str())
            .ok_or_else(|| BuildGraphError::InvalidPath(path.display().to_string()))?;

        Ok(AssetReference::new(schema, name))
    }

    /// Read `-- @key: value` metadata from the leading comment block.
    fn parse_headers(content: &str) -> BTreeMap<String, String> {
        let mut headers = BTreeMap::new();

        for line in content.lines() {
            let trimmed = line.trim();
            if !trimmed.starts_with("--") {
                break;
            }
            let rest = trimmed.trim_start_matches('-').trim();
            let Some(rest) = rest.strip_prefix('@') else {
                continue;
            };
            if let Some((key, value)) = rest.split_once(':') {
                headers.insert(key.trim().to_string(), value.trim().to_string());
            }
        }

        headers
    }

    /// Read `-- @kind: <value>` from parsed headers.
    /// Returns `AssetKind::View` when absent or unrecognised.
    fn parse_kind(headers: &BTreeMap<String, String>) -> AssetKind {
        if let Some(kind_str) = headers.get("kind").map(String::as_str) {
            if let Ok(kind) = kind_str.parse::<AssetKind>() {
                return kind;
            }
        }
        AssetKind::View
    }

    /// Strip leading `-- @…` header lines and return the SQL body.
    /// Handles both `\n` and `\r\n` line endings.
    fn sql_body(content: &str) -> &str {
        let mut offset = 0;
        let mut pos = 0;
        while pos < content.len() {
            let line_end = content[pos..].find('\n').map_or(content.len(), |i| pos + i);
            let line = content[pos..line_end].trim_end_matches('\r');
            let trimmed = line.trim();
            if trimmed.starts_with("--") && trimmed.contains('@') {
                pos = (line_end + 1).min(content.len());
                offset = pos;
            } else {
                break;
            }
        }
        content[offset..].trim()
    }

    /// Extract all `schema.table` references from FROM / JOIN clauses.
    /// Handles both plain identifiers and double-quoted identifiers.
    /// Strips SQL comments before matching to avoid false positives.
    /// Deduplicates results.
    fn extract_deps(content: &str) -> Vec<AssetReference> {
        static RE: OnceLock<Regex> = OnceLock::new();
        let re = RE.get_or_init(|| {
            Regex::new(
                r#"(?i)(?:FROM|JOIN)\s+"?([a-zA-Z_][a-zA-Z0-9_]*)"?\."?([a-zA-Z_][a-zA-Z0-9_]*)"?"#,
            )
            .expect("hard-coded regex is valid")
        });

        let stripped = Self::strip_comments(content);
        let mut seen: HashSet<(String, String)> = HashSet::new();
        let mut refs = Vec::new();

        for cap in re.captures_iter(&stripped) {
            let schema = cap[1].to_lowercase();
            let name = cap[2].to_lowercase();
            if seen.insert((schema.clone(), name.clone())) {
                refs.push(AssetReference::new(schema, name));
            }
        }

        refs
    }

    /// Remove `--` line comments and `/* */` block comments from SQL.
    fn strip_comments(sql: &str) -> String {
        let mut out = String::with_capacity(sql.len());
        let mut chars = sql.chars().peekable();
        while let Some(ch) = chars.next() {
            match ch {
                '-' if chars.peek() == Some(&'-') => {
                    while chars.peek().map_or(false, |&c| c != '\n') {
                        chars.next();
                    }
                }
                '/' if chars.peek() == Some(&'*') => {
                    chars.next();
                    loop {
                        match chars.next() {
                            Some('*') if chars.peek() == Some(&'/') => {
                                chars.next();
                                break;
                            }
                            None => break,
                            _ => {}
                        }
                    }
                }
                _ => out.push(ch),
            }
        }
        out
    }
}