hinge 0.1.0

SQL-native ELT engine — dependency graph resolved automatically from FROM/JOIN clauses, parallel execution, single binary
Documentation
use crate::{Asset, AssetKind, ExecutorError};
use snowflake_api::SnowflakeApi;
use std::fmt::Write;

pub async fn run_asset(api: &SnowflakeApi, asset: &Asset) -> Result<(), ExecutorError> {
    let sql = build_ddl(asset)?;
    api.exec(&sql)
        .await
        .map(|_| ())
        .map_err(|e| ExecutorError::Run {
            asset:   asset.to_string(),
            message: e.to_string(),
        })
}

fn build_ddl(asset: &Asset) -> Result<String, ExecutorError> {
    let schema = quote_ident(asset.reference().schema());
    let name   = quote_ident(asset.reference().name());
    let source = asset.source().as_str();

    let sql = match asset.kind() {
        AssetKind::View => {
            format!("CREATE OR REPLACE VIEW {schema}.{name} AS\n{source}")
        }
        AssetKind::Table => {
            let mut sql = format!("CREATE OR REPLACE TABLE {schema}.{name}");
            if let Some(cluster_by) = asset.header("snowflake.cluster_by") {
                write!(sql, " CLUSTER BY ({cluster_by})").unwrap();
            }
            write!(sql, " AS\n{source}").unwrap();
            sql
        }
        AssetKind::MaterializedView => {
            // Snowflake materialized views require Enterprise edition and have
            // restrictions (single base table, no aggregations). The source
            // SQL must satisfy these constraints.
            format!("CREATE OR REPLACE MATERIALIZED VIEW {schema}.{name} AS\n{source}")
        }
    };

    Ok(sql)
}

fn quote_ident(value: &str) -> String {
    // Snowflake double-quotes identifiers; an embedded " is escaped as "".
    format!("\"{}\"", value.replace('"', "\"\""))
}

#[cfg(test)]
mod tests {
    use super::{build_ddl, quote_ident};
    use crate::{Asset, AssetKind, AssetReference, AssetSource};
    use std::collections::BTreeMap;

    fn asset(kind: AssetKind, schema: &str, name: &str) -> Asset {
        Asset::new(
            kind,
            AssetReference::new(schema, name),
            AssetSource::new("SELECT 1").unwrap(),
        )
    }

    #[test]
    fn quote_ident_escapes_double_quotes() {
        assert_eq!(quote_ident(r#"a"b"#), r#""a""b""#);
    }

    #[test]
    fn build_ddl_view() {
        let sql = build_ddl(&asset(AssetKind::View, "staging", "orders")).unwrap();
        assert!(sql.starts_with("CREATE OR REPLACE VIEW \"staging\".\"orders\" AS"));
    }

    #[test]
    fn build_ddl_table_without_cluster() {
        let sql = build_ddl(&asset(AssetKind::Table, "mart", "revenue")).unwrap();
        assert!(sql.starts_with("CREATE OR REPLACE TABLE \"mart\".\"revenue\" AS"));
        assert!(!sql.contains("CLUSTER BY"));
    }

    #[test]
    fn build_ddl_table_with_cluster() {
        let mut headers = BTreeMap::new();
        headers.insert("snowflake.cluster_by".to_string(), "event_date, user_id".to_string());
        let a = Asset::with_headers(
            AssetKind::Table,
            AssetReference::new("mart", "events"),
            AssetSource::new("SELECT 1").unwrap(),
            headers,
        );
        let sql = build_ddl(&a).unwrap();
        assert!(sql.contains("CLUSTER BY (event_date, user_id)"));
    }

    #[test]
    fn build_ddl_materialized_view() {
        let sql = build_ddl(&asset(AssetKind::MaterializedView, "mart", "mv")).unwrap();
        assert!(sql.starts_with("CREATE OR REPLACE MATERIALIZED VIEW"));
    }
}