hinge 0.1.0

SQL-native ELT engine — dependency graph resolved automatically from FROM/JOIN clauses, parallel execution, single binary
Documentation
mod executor;
mod utils;

pub use executor::{DuckDbConnectionError, DuckDbExecutor};

use crate::application::{Executor, ExecutorError};
use crate::domain::Asset;
use async_trait::async_trait;
use std::time::Instant;
use super::utils::{fmt_duration, SLOW_THRESHOLD};
use utils::run_asset;

#[async_trait]
impl Executor for DuckDbExecutor {
    async fn run(&self, asset: &Asset) -> Result<(), ExecutorError> {
        tracing::debug!(asset = %asset, kind = %asset.kind(), "starting");

        let start = Instant::now();
        run_asset(self.conn(), asset).await?;

        let elapsed = start.elapsed();
        if elapsed >= SLOW_THRESHOLD {
            tracing::warn!(
                asset     = %asset,
                kind      = %asset.kind(),
                elapsed   = %fmt_duration(elapsed),
                threshold = %fmt_duration(SLOW_THRESHOLD),
                "slow query"
            );
        } else {
            tracing::info!(
                asset   = %asset,
                kind    = %asset.kind(),
                elapsed = %fmt_duration(elapsed),
                "done"
            );
        }

        Ok(())
    }
}

// ── Integration tests ─────────────────────────────────────────────────────────
//
// DuckDB in-memory databases are cheap to spin up, so we can test the full
// executor path (including spawn_blocking) here.

#[cfg(test)]
mod tests {
    use super::DuckDbExecutor;
    use crate::{Asset, AssetKind, AssetReference, AssetSource, Executor};

    fn asset(kind: AssetKind, schema: &str, name: &str, sql: &str) -> Asset {
        Asset::new(
            kind,
            AssetReference::new(schema, name),
            AssetSource::new(sql).unwrap(),
        )
    }

    #[tokio::test]
    async fn run_creates_view_in_memory() {
        let executor = DuckDbExecutor::from_url("duckdb://:memory:").unwrap();
        let a = asset(AssetKind::View, "staging", "numbers", "SELECT 1 AS n");
        executor.run(&a).await.expect("should create view");
    }

    #[tokio::test]
    async fn run_creates_table_in_memory() {
        let executor = DuckDbExecutor::from_url("duckdb://:memory:").unwrap();
        let a = asset(AssetKind::Table, "mart", "totals", "SELECT 42 AS total");
        executor.run(&a).await.expect("should create table");
    }

    #[tokio::test]
    async fn run_materialized_view_as_table() {
        let executor = DuckDbExecutor::from_url("duckdb://:memory:").unwrap();
        let a = asset(AssetKind::MaterializedView, "mart", "mv_sales", "SELECT 1 AS sale");
        // Should succeed — mapped to CREATE OR REPLACE TABLE
        executor.run(&a).await.expect("materialized view should run as table");
    }

    #[tokio::test]
    async fn run_respects_dependency_order() {
        let executor = DuckDbExecutor::from_url("duckdb://:memory:").unwrap();

        let raw = asset(AssetKind::Table, "raw", "events", "SELECT 1 AS id");
        let derived = asset(
            AssetKind::View,
            "staging",
            "events",
            "SELECT id FROM raw.events",
        );

        executor.run(&raw).await.expect("raw.events");
        executor.run(&derived).await.expect("staging.events");
    }
}