hinge 0.1.0

SQL-native ELT engine — dependency graph resolved automatically from FROM/JOIN clauses, parallel execution, single binary
Documentation
use crate::{Asset, AssetKind, ExecutorError};
use clickhouse::Client;
use std::fmt::Write;

#[derive(Debug, Clone, PartialEq, Eq)]
struct ClickHouseOptions {
    engine: String,
    order_by: Option<String>,
    primary_key: Option<String>,
    partition_by: Option<String>,
    sample_by: Option<String>,
    settings: Option<String>,
    populate: bool,
}

impl ClickHouseOptions {
    fn from_asset(asset: &Asset, require_order_by: bool) -> Result<Self, ExecutorError> {
        let engine = asset
            .header("clickhouse.engine")
            .unwrap_or("MergeTree")
            .trim()
            .to_string();

        let order_by = header(asset, "clickhouse.order_by");
        if require_order_by
            && matches!(asset.kind(), AssetKind::Table | AssetKind::MaterializedView)
            && order_by.is_none()
        {
            return Err(ExecutorError::Run {
                asset: asset.to_string(),
                message: "missing '-- @clickhouse.order_by: ...' header; use ClickHouseExecutor::allow_unordered_tables() only for small or intentionally unsorted assets".to_string(),
            });
        }

        Ok(Self {
            engine,
            order_by,
            primary_key: header(asset, "clickhouse.primary_key"),
            partition_by: header(asset, "clickhouse.partition_by"),
            sample_by: header(asset, "clickhouse.sample_by"),
            settings: header(asset, "clickhouse.settings"),
            populate: asset.header("clickhouse.populate").is_some_and(|v| {
                matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "yes")
            }),
        })
    }

    fn table_clause(&self) -> String {
        let mut clause = format!("ENGINE = {}", self.engine);
        if let Some(partition_by) = &self.partition_by {
            write!(clause, " PARTITION BY {partition_by}").unwrap();
        }
        if let Some(order_by) = &self.order_by {
            write!(clause, " ORDER BY {order_by}").unwrap();
        }
        if let Some(primary_key) = &self.primary_key {
            write!(clause, " PRIMARY KEY {primary_key}").unwrap();
        }
        if let Some(sample_by) = &self.sample_by {
            write!(clause, " SAMPLE BY {sample_by}").unwrap();
        }
        if let Some(settings) = &self.settings {
            write!(clause, " SETTINGS {settings}").unwrap();
        }
        clause
    }
}

pub async fn run_asset(
    client: &Client,
    default_database: Option<&str>,
    require_order_by: bool,
    asset: &Asset,
) -> Result<(), ExecutorError> {
    let options = ClickHouseOptions::from_asset(asset, require_order_by)?;

    let schema = quote_ident(asset.reference().schema());
    let name = quote_ident(asset.reference().name());
    let source = asset.source().as_str();

    match asset.kind() {
        AssetKind::View => {
            let sql = format!("CREATE OR REPLACE VIEW {schema}.{name} AS {source}");
            exec(client, asset, &sql).await
        }
        AssetKind::Table => replace_table(client, default_database, asset, &options).await,
        AssetKind::MaterializedView => {
            drop_any(client, asset).await?;
            let populate = if options.populate { " POPULATE" } else { "" };
            let sql = format!(
                "CREATE MATERIALIZED VIEW {schema}.{name} {}{populate} AS {source}",
                options.table_clause()
            );
            exec(client, asset, &sql).await
        }
    }
}

async fn replace_table(
    client: &Client,
    default_database: Option<&str>,
    asset: &Asset,
    options: &ClickHouseOptions,
) -> Result<(), ExecutorError> {
    let schema = quote_ident(asset.reference().schema());
    let name = quote_ident(asset.reference().name());
    let tmp_name = quote_ident(&temporary_name(default_database, asset));
    let source = asset.source().as_str();

    let create_tmp = format!(
        "CREATE TABLE {schema}.{tmp_name} {} AS {source}",
        options.table_clause()
    );
    exec(
        client,
        asset,
        &format!("DROP TABLE IF EXISTS {schema}.{tmp_name}"),
    )
    .await?;
    exec(client, asset, &create_tmp).await?;

    let old_name = quote_ident(&old_name(default_database, asset));

    if table_exists(client, asset).await? {
        if exec(client, asset, &format!("EXCHANGE TABLES {schema}.{name} AND {schema}.{tmp_name}")).await.is_ok() {
            exec(
                client,
                asset,
                &format!("DROP TABLE IF EXISTS {schema}.{tmp_name}"),
            )
            .await?;
            return Ok(());
        }

        exec(
            client,
            asset,
            &format!("RENAME TABLE {schema}.{name} TO {schema}.{old_name}"),
        )
        .await?;
        exec(
            client,
            asset,
            &format!("RENAME TABLE {schema}.{tmp_name} TO {schema}.{name}"),
        )
        .await?;
        exec(
            client,
            asset,
            &format!("DROP TABLE IF EXISTS {schema}.{old_name}"),
        )
        .await
    } else {
        exec(
            client,
            asset,
            &format!("RENAME TABLE {schema}.{tmp_name} TO {schema}.{name}"),
        )
        .await
    }
}

async fn table_exists(client: &Client, asset: &Asset) -> Result<bool, ExecutorError> {
    let schema = quote_ident(asset.reference().schema());
    let name = quote_ident(asset.reference().name());

    client
        .query(&format!("EXISTS TABLE {schema}.{name}"))
        .fetch_one::<u8>()
        .await
        .map(|exists| exists == 1)
        .map_err(|e| ExecutorError::Run {
            asset: asset.to_string(),
            message: e.to_string(),
        })
}

fn header(asset: &Asset, key: &str) -> Option<String> {
    asset
        .header(key)
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .map(ToOwned::to_owned)
}

async fn exec(client: &Client, asset: &Asset, sql: &str) -> Result<(), ExecutorError> {
    client
        .query(sql)
        .execute()
        .await
        .map_err(|e| ExecutorError::Run {
            asset: asset.to_string(),
            message: e.to_string(),
        })
}

async fn drop_any(client: &Client, asset: &Asset) -> Result<(), ExecutorError> {
    let schema = quote_ident(asset.reference().schema());
    let name = quote_ident(asset.reference().name());
    let sql = format!("DROP TABLE IF EXISTS {schema}.{name}");

    exec(client, asset, &sql).await
}

fn quote_ident(value: &str) -> String {
    let mut quoted = String::with_capacity(value.len() + 2);
    quoted.push('`');
    for ch in value.chars() {
        if matches!(ch, '\\' | '`') {
            quoted.push('\\');
        }
        quoted.push(ch);
    }
    quoted.push('`');
    quoted
}

fn temporary_name(default_database: Option<&str>, asset: &Asset) -> String {
    format!(
        "{}__hinge_tmp__{}",
        asset.reference().name(),
        object_suffix(default_database, asset)
    )
}

fn old_name(default_database: Option<&str>, asset: &Asset) -> String {
    format!(
        "{}__hinge_old__{}",
        asset.reference().name(),
        object_suffix(default_database, asset)
    )
}

fn object_suffix(default_database: Option<&str>, asset: &Asset) -> u64 {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};

    let mut h = DefaultHasher::new();
    default_database.hash(&mut h);
    asset.reference().schema().hash(&mut h);
    asset.reference().name().hash(&mut h);
    h.finish()
}

#[cfg(test)]
mod tests {
    use super::{ClickHouseOptions, quote_ident};
    use crate::{Asset, AssetKind, AssetReference, AssetSource};
    use std::collections::BTreeMap;

    #[test]
    fn quote_ident_escapes_backticks_and_backslashes() {
        assert_eq!(quote_ident(r#"a\b`c"#), r#"`a\\b\`c`"#);
    }

    #[test]
    fn options_require_order_by_for_tables() {
        let asset = Asset::new(
            AssetKind::Table,
            AssetReference::new("mart", "events"),
            AssetSource::new("SELECT 1").unwrap(),
        );

        assert!(ClickHouseOptions::from_asset(&asset, true).is_err());
        assert!(ClickHouseOptions::from_asset(&asset, false).is_ok());
    }

    #[test]
    fn options_render_merge_tree_clauses() {
        let mut headers = BTreeMap::new();
        headers.insert(
            "clickhouse.order_by".to_string(),
            "(event_date, user_id)".to_string(),
        );
        headers.insert(
            "clickhouse.partition_by".to_string(),
            "toYYYYMM(event_date)".to_string(),
        );
        headers.insert("clickhouse.primary_key".to_string(), "user_id".to_string());
        headers.insert(
            "clickhouse.settings".to_string(),
            "index_granularity = 8192".to_string(),
        );

        let asset = Asset::with_headers(
            AssetKind::Table,
            AssetReference::new("mart", "events"),
            AssetSource::new("SELECT 1").unwrap(),
            headers,
        );

        let options = ClickHouseOptions::from_asset(&asset, true).unwrap();
        assert_eq!(
            options.table_clause(),
            "ENGINE = MergeTree PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, user_id) PRIMARY KEY user_id SETTINGS index_granularity = 8192"
        );
    }
}