hinge 0.1.0

SQL-native ELT engine — dependency graph resolved automatically from FROM/JOIN clauses, parallel execution, single binary
Documentation
use crate::{Asset, AssetKind, ExecutorError};
use duckdb::Connection;
use std::sync::{Arc, Mutex};

/// Execute the DDL for `asset` against the provided DuckDB connection.
///
/// DuckDB is synchronous; we run inside `spawn_blocking` so the caller
/// (an async executor impl) is not blocked.
pub async fn run_asset(
    conn: Arc<Mutex<Connection>>,
    asset: &Asset,
) -> Result<(), ExecutorError> {
    let batch = build_batch(asset);
    let label = asset.to_string();

    tokio::task::spawn_blocking(move || {
        let db = conn.lock().unwrap_or_else(|e| e.into_inner());
        db.execute_batch(&batch).map_err(|e| e.to_string())
    })
    .await
    // JoinError means the blocking task panicked
    .map_err(|e| ExecutorError::Run {
        asset:   label.clone(),
        message: format!("blocking task panicked: {e}"),
    })?
    // duckdb::Error
    .map_err(|message| ExecutorError::Run { asset: label, message })
}

// ── DDL building ──────────────────────────────────────────────────────────────

/// Returns a semicolon-separated batch: schema creation + object DDL.
fn build_batch(asset: &Asset) -> String {
    let schema_raw = asset.reference().schema();
    let schema     = quote_ident(schema_raw);
    let name       = quote_ident(asset.reference().name());
    let source     = asset.source().as_str();

    // DuckDB requires the schema to exist before any object is created in it.
    let ensure_schema = format!("CREATE SCHEMA IF NOT EXISTS {schema}");

    let create = match asset.kind() {
        AssetKind::View => {
            format!("CREATE OR REPLACE VIEW {schema}.{name} AS\n{source}")
        }
        AssetKind::Table => {
            format!("CREATE OR REPLACE TABLE {schema}.{name} AS\n{source}")
        }
        AssetKind::MaterializedView => {
            // DuckDB has no MATERIALIZED VIEW DDL; we materialise as a TABLE.
            // The asset is refreshed the same way: the table is replaced on
            // every run, so the semantics are identical for an ELT pipeline.
            tracing::debug!(
                "DuckDB: materialised_view '{}' will be created as a table",
                asset
            );
            format!("CREATE OR REPLACE TABLE {schema}.{name} AS\n{source}")
        }
    };

    format!("{ensure_schema};\n{create}")
}

fn quote_ident(value: &str) -> String {
    // DuckDB (like Postgres/Snowflake) uses double-quoted identifiers.
    format!("\"{}\"", value.replace('"', "\"\""))
}

// ── Tests ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::{build_batch, quote_ident};
    use crate::{Asset, AssetKind, AssetReference, AssetSource};

    fn asset(kind: AssetKind, schema: &str, name: &str) -> Asset {
        Asset::new(
            kind,
            AssetReference::new(schema, name),
            AssetSource::new("SELECT 1 AS n").unwrap(),
        )
    }

    #[test]
    fn quote_ident_escapes_double_quotes() {
        assert_eq!(quote_ident(r#"my"schema"#), r#""my""schema""#);
    }

    #[test]
    fn build_batch_view() {
        let sql = build_batch(&asset(AssetKind::View, "staging", "orders"));
        assert!(sql.contains("CREATE SCHEMA IF NOT EXISTS \"staging\""));
        assert!(sql.contains("CREATE OR REPLACE VIEW \"staging\".\"orders\" AS"));
        assert!(!sql.contains("TABLE"));
    }

    #[test]
    fn build_batch_table() {
        let sql = build_batch(&asset(AssetKind::Table, "mart", "revenue"));
        assert!(sql.contains("CREATE OR REPLACE TABLE \"mart\".\"revenue\" AS"));
    }

    #[test]
    fn build_batch_materialized_view_maps_to_table() {
        let sql = build_batch(&asset(AssetKind::MaterializedView, "mart", "mv"));
        // DuckDB doesn't support MATERIALIZED VIEW — silently mapped to TABLE
        assert!(sql.contains("CREATE OR REPLACE TABLE"));
        assert!(!sql.contains("MATERIALIZED VIEW"));
    }
}