hinge 0.1.0

SQL-native ELT engine — dependency graph resolved automatically from FROM/JOIN clauses, parallel execution, single binary
Documentation
//! SQL-native ELT engine — single binary, no runtime required.
//!
//! Hone walks a directory of `.sql` files, builds a dependency graph by parsing
//! `FROM` / `JOIN` clauses, and executes every asset in the correct order —
//! running independent assets **in parallel** within each wave.
//!
//! # Quick start
//!
//! ```rust,ignore
//! use hinge::{BuildGraph, RunAll, PostgresExecutor};
//!
//! #[tokio::main]
//! async fn main() -> anyhow::Result<()> {
//!     let graph    = BuildGraph::from_dir("models")?;
//!     let executor = PostgresExecutor::new("postgresql://user:pass@localhost/mydb").await?;
//!     RunAll::new(graph, executor).execute().await?;
//!     Ok(())
//! }
//! ```
//!
//! # File layout
//!
//! ```text
//! models/
//!   raw/
//!     events.sql          →  raw.events
//!   staging/
//!     orders.sql          →  staging.orders   (SELECT … FROM raw.events)
//!   mart/
//!     revenue.sql         →  mart.revenue     (SELECT … FROM staging.orders)
//! ```
//!
//! Dependencies are resolved automatically — no `ref()` calls, no manifest files.
//!
//! # Asset kinds
//!
//! Declare the output type with a header comment at the top of the file.
//! Defaults to `view` when absent.
//!
//! ```sql
//! -- @kind: table
//! -- @kind: view
//! -- @kind: materialized_view
//! SELECT …
//! ```
//!
//! # Run modes
//!
//! | Type | Behaviour |
//! |------|-----------|
//! | [`RunAll`] | Full graph in dependency order |
//! | [`RunUpstream`] | All ancestors of a node, then the node itself |
//! | [`RunDownstream`] | The node, then all its descendants |
//! | [`RunBidirectional`] | Both directions from a node |
//!
//! Every run type exposes `.plan()` for a dry-run preview that resolves the
//! execution order without connecting to the database.

mod application;
mod domain;
mod infrastructure;

pub use application::{
    BuildGraph,
    BuildGraphError,
    Engine,
    Executor,
    ExecutorError,
    RunAll,
    RunBidirectional,
    RunDownstream,
    RunUpstream,
};
pub use domain::{
    Asset,
    AssetError,
    AssetKind,
    AssetReference,
    AssetSource,
    Graph,
    GraphError,
};
pub use infrastructure::{
    ClickHouseExecutor,
    DuckDbConnectionError, DuckDbExecutor,
    PostgresExecutor,
    SnowflakeConnectionError, SnowflakeExecutor,
};

// ─── Tests ────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use async_trait::async_trait;
    use std::collections::HashMap;
    use std::sync::{Arc, Mutex};

    // ── helpers ────────────────────────────────────────────────────────────────

    fn asset(schema: &str, name: &str) -> Asset {
        Asset::new(
            AssetKind::View,
            AssetReference::new(schema, name),
            AssetSource::new("SELECT 1").unwrap(),
        )
    }

    /// raw → staging → mart_a
    ///                → mart_b
    fn build_graph() -> (Graph, Asset, Asset, Asset, Asset) {
        let raw     = asset("public", "raw");
        let staging = asset("public", "staging");
        let mart_a  = asset("public", "mart_a");
        let mart_b  = asset("public", "mart_b");

        let mut edges: HashMap<Asset, Vec<Asset>> = HashMap::new();
        edges.insert(raw.clone(),     vec![staging.clone()]);
        edges.insert(staging.clone(), vec![mart_a.clone(), mart_b.clone()]);
        edges.insert(mart_a.clone(),  vec![]);
        edges.insert(mart_b.clone(),  vec![]);

        let graph = Graph::build(edges).unwrap();
        (graph, raw, staging, mart_a, mart_b)
    }

    // ── mock executor ──────────────────────────────────────────────────────────

    #[derive(Clone)]
    struct MockExecutor {
        calls: Arc<Mutex<Vec<String>>>,
    }

    impl MockExecutor {
        fn with_log(calls: Arc<Mutex<Vec<String>>>) -> Self {
            Self { calls }
        }
    }

    #[async_trait]
    impl Executor for MockExecutor {
        async fn run(&self, asset: &Asset) -> Result<(), ExecutorError> {
            self.calls.lock().unwrap().push(asset.to_string());
            Ok(())
        }
    }

    // ── BuildGraph ─────────────────────────────────────────────────────────────

    /// Helper: write a SQL file inside a temporary dir tree.
    fn write_sql(base: &std::path::Path, schema: &str, model: &str, sql: &str) {
        let dir = base.join(schema);
        std::fs::create_dir_all(&dir).unwrap();
        std::fs::write(dir.join(format!("{model}.sql")), sql).unwrap();
    }

    #[test]
    fn build_graph_resolves_deps() {
        let tmp = tempfile::tempdir().unwrap();
        let root = tmp.path();

        //  raw.events  ──→  staging.orders  ──→  mart.revenue
        write_sql(root, "raw",     "events",  "SELECT 1");
        write_sql(root, "staging", "orders",  "SELECT id FROM raw.events");
        write_sql(root, "mart",    "revenue", "SELECT sum FROM staging.orders");

        let graph = BuildGraph::from_dir(root).unwrap();
        let order = graph.topo_sort();

        let pos = |s: &str| {
            order.iter().position(|a| a.to_string() == s).unwrap()
        };

        assert_eq!(order.len(), 3);
        assert!(pos("raw.events")    < pos("staging.orders"),  "events before orders");
        assert!(pos("staging.orders") < pos("mart.revenue"),   "orders before revenue");
    }

    #[test]
    fn build_graph_kind_header() {
        let tmp = tempfile::tempdir().unwrap();
        let root = tmp.path();

        write_sql(root, "mart", "sales",
            "-- @kind: materialized_view\nSELECT 1");

        let graph = BuildGraph::from_dir(root).unwrap();
        let assets: Vec<_> = graph.topo_sort();
        assert_eq!(assets[0].kind(), AssetKind::MaterializedView);
    }

    #[test]
    fn build_graph_preserves_metadata_headers() {
        let tmp = tempfile::tempdir().unwrap();
        let root = tmp.path();

        write_sql(root, "mart", "events",
            "-- @kind: table\n-- @clickhouse.order_by: (event_date, user_id)\nSELECT 1");

        let graph = BuildGraph::from_dir(root).unwrap();
        let assets: Vec<_> = graph.topo_sort();
        assert_eq!(
            assets[0].header("clickhouse.order_by"),
            Some("(event_date, user_id)")
        );
    }

    #[test]
    fn build_graph_ignores_external_refs() {
        let tmp = tempfile::tempdir().unwrap();
        let root = tmp.path();

        // raw_db.source is NOT in our tree → should be ignored, no graph error
        write_sql(root, "staging", "orders",
            "SELECT id FROM raw_db.source");

        let graph = BuildGraph::from_dir(root).unwrap();
        assert_eq!(graph.topo_sort().len(), 1);
    }

    #[test]
    fn build_graph_deduplicates_deps() {
        let tmp = tempfile::tempdir().unwrap();
        let root = tmp.path();

        write_sql(root, "raw",     "events", "SELECT 1");
        // multiple references to the same table → no duplicate edges
        write_sql(root, "staging", "orders",
            "SELECT a.id, b.val FROM raw.events a JOIN raw.events b ON a.id = b.id");

        let graph = BuildGraph::from_dir(root).unwrap();
        assert_eq!(graph.topo_sort().len(), 2);
    }

    // ── Graph::topo_sort ───────────────────────────────────────────────────────

    #[test]
    fn topo_sort_dependencies_before_dependents() {
        let (graph, raw, staging, mart_a, mart_b) = build_graph();
        let order = graph.topo_sort();

        let pos = |a: &Asset| order.iter().position(|x| x == a).unwrap();

        assert!(pos(&raw)     < pos(&staging), "raw must come before staging");
        assert!(pos(&staging) < pos(&mart_a),  "staging must come before mart_a");
        assert!(pos(&staging) < pos(&mart_b),  "staging must come before mart_b");
        assert_eq!(order.len(), 4);
    }

    // ── RunAll ─────────────────────────────────────────────────────────────────

    #[tokio::test]
    async fn run_all_executes_in_topo_order() {
        let (graph, ..) = build_graph();

        let calls = Arc::new(Mutex::new(vec![]));
        let use_case = RunAll::new(graph, MockExecutor::with_log(calls.clone()));
        use_case.execute().await.unwrap();

        let order = calls.lock().unwrap().clone();
        assert_eq!(order.len(), 4);

        let pos = |s: &str| order.iter().position(|x| x == s).unwrap();
        assert!(pos("public.raw")     < pos("public.staging"));
        assert!(pos("public.staging") < pos("public.mart_a"));
        assert!(pos("public.staging") < pos("public.mart_b"));

        // plan() should match
        let plan = RunAll::new(
            build_graph().0,
            MockExecutor::with_log(Arc::new(Mutex::new(vec![]))),
        ).plan();
        assert_eq!(plan.len(), 4);
    }

    // ── RunUpstream ────────────────────────────────────────────────────────────

    #[tokio::test]
    async fn run_upstream_of_mart_a() {
        let (graph, _raw, _staging, mart_a, _mart_b) = build_graph();

        let calls = Arc::new(Mutex::new(vec![]));
        let use_case = RunUpstream::new(graph, MockExecutor::with_log(calls.clone()));
        use_case.execute(&mart_a).await.unwrap();

        let order = calls.lock().unwrap().clone();
        // raw → staging → mart_a  (mart_b excluded)
        assert_eq!(order.len(), 3);
        assert!(!order.contains(&"public.mart_b".to_string()));

        let pos = |s: &str| order.iter().position(|x| x == s).unwrap();
        assert!(pos("public.raw")     < pos("public.staging"));
        assert!(pos("public.staging") < pos("public.mart_a"));
        assert_eq!(order.last().unwrap(), "public.mart_a");
    }

    // ── RunDownstream ──────────────────────────────────────────────────────────

    #[tokio::test]
    async fn run_downstream_from_staging() {
        let (graph, _raw, staging, _mart_a, _mart_b) = build_graph();

        let calls = Arc::new(Mutex::new(vec![]));
        let use_case = RunDownstream::new(graph, MockExecutor::with_log(calls.clone()));
        use_case.execute(&staging).await.unwrap();

        let order = calls.lock().unwrap().clone();
        // staging → mart_a, mart_b  (raw excluded)
        assert_eq!(order.len(), 3);
        assert!(!order.contains(&"public.raw".to_string()));
        assert_eq!(order[0], "public.staging");

        let pos = |s: &str| order.iter().position(|x| x == s).unwrap();
        assert!(pos("public.staging") < pos("public.mart_a"));
        assert!(pos("public.staging") < pos("public.mart_b"));
    }

    // ── RunBidirectional ───────────────────────────────────────────────────────

    #[tokio::test]
    async fn run_bidirectional_from_staging() {
        let (graph, _raw, staging, _mart_a, _mart_b) = build_graph();

        let calls = Arc::new(Mutex::new(vec![]));
        let use_case = RunBidirectional::new(graph, MockExecutor::with_log(calls.clone()));
        use_case.execute(&staging).await.unwrap();

        let order = calls.lock().unwrap().clone();
        // raw → staging → mart_a, mart_b
        assert_eq!(order.len(), 4);

        let pos = |s: &str| order.iter().position(|x| x == s).unwrap();
        assert!(pos("public.raw")     < pos("public.staging"), "raw before staging");
        assert!(pos("public.staging") < pos("public.mart_a"),  "staging before mart_a");
        assert!(pos("public.staging") < pos("public.mart_b"),  "staging before mart_b");
    }
}