use crate::{Asset, AssetKind, ExecutorError};
use clickhouse::Client;
use std::fmt::Write;
#[derive(Debug, Clone, PartialEq, Eq)]
struct ClickHouseOptions {
engine: String,
order_by: Option<String>,
primary_key: Option<String>,
partition_by: Option<String>,
sample_by: Option<String>,
settings: Option<String>,
populate: bool,
}
impl ClickHouseOptions {
fn from_asset(asset: &Asset, require_order_by: bool) -> Result<Self, ExecutorError> {
let engine = asset
.header("clickhouse.engine")
.unwrap_or("MergeTree")
.trim()
.to_string();
let order_by = header(asset, "clickhouse.order_by");
if require_order_by
&& matches!(asset.kind(), AssetKind::Table | AssetKind::MaterializedView)
&& order_by.is_none()
{
return Err(ExecutorError::Run {
asset: asset.to_string(),
message: "missing '-- @clickhouse.order_by: ...' header; use ClickHouseExecutor::allow_unordered_tables() only for small or intentionally unsorted assets".to_string(),
});
}
Ok(Self {
engine,
order_by,
primary_key: header(asset, "clickhouse.primary_key"),
partition_by: header(asset, "clickhouse.partition_by"),
sample_by: header(asset, "clickhouse.sample_by"),
settings: header(asset, "clickhouse.settings"),
populate: asset.header("clickhouse.populate").is_some_and(|v| {
matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "yes")
}),
})
}
fn table_clause(&self) -> String {
let mut clause = format!("ENGINE = {}", self.engine);
if let Some(partition_by) = &self.partition_by {
write!(clause, " PARTITION BY {partition_by}").unwrap();
}
if let Some(order_by) = &self.order_by {
write!(clause, " ORDER BY {order_by}").unwrap();
}
if let Some(primary_key) = &self.primary_key {
write!(clause, " PRIMARY KEY {primary_key}").unwrap();
}
if let Some(sample_by) = &self.sample_by {
write!(clause, " SAMPLE BY {sample_by}").unwrap();
}
if let Some(settings) = &self.settings {
write!(clause, " SETTINGS {settings}").unwrap();
}
clause
}
}
pub async fn run_asset(
client: &Client,
default_database: Option<&str>,
require_order_by: bool,
asset: &Asset,
) -> Result<(), ExecutorError> {
let options = ClickHouseOptions::from_asset(asset, require_order_by)?;
let schema = quote_ident(asset.reference().schema());
let name = quote_ident(asset.reference().name());
let source = asset.source().as_str();
match asset.kind() {
AssetKind::View => {
let sql = format!("CREATE OR REPLACE VIEW {schema}.{name} AS {source}");
exec(client, asset, &sql).await
}
AssetKind::Table => replace_table(client, default_database, asset, &options).await,
AssetKind::MaterializedView => {
drop_any(client, asset).await?;
let populate = if options.populate { " POPULATE" } else { "" };
let sql = format!(
"CREATE MATERIALIZED VIEW {schema}.{name} {}{populate} AS {source}",
options.table_clause()
);
exec(client, asset, &sql).await
}
}
}
async fn replace_table(
client: &Client,
default_database: Option<&str>,
asset: &Asset,
options: &ClickHouseOptions,
) -> Result<(), ExecutorError> {
let schema = quote_ident(asset.reference().schema());
let name = quote_ident(asset.reference().name());
let tmp_name = quote_ident(&temporary_name(default_database, asset));
let source = asset.source().as_str();
let create_tmp = format!(
"CREATE TABLE {schema}.{tmp_name} {} AS {source}",
options.table_clause()
);
exec(
client,
asset,
&format!("DROP TABLE IF EXISTS {schema}.{tmp_name}"),
)
.await?;
exec(client, asset, &create_tmp).await?;
let old_name = quote_ident(&old_name(default_database, asset));
if table_exists(client, asset).await? {
if exec(client, asset, &format!("EXCHANGE TABLES {schema}.{name} AND {schema}.{tmp_name}")).await.is_ok() {
exec(
client,
asset,
&format!("DROP TABLE IF EXISTS {schema}.{tmp_name}"),
)
.await?;
return Ok(());
}
exec(
client,
asset,
&format!("RENAME TABLE {schema}.{name} TO {schema}.{old_name}"),
)
.await?;
exec(
client,
asset,
&format!("RENAME TABLE {schema}.{tmp_name} TO {schema}.{name}"),
)
.await?;
exec(
client,
asset,
&format!("DROP TABLE IF EXISTS {schema}.{old_name}"),
)
.await
} else {
exec(
client,
asset,
&format!("RENAME TABLE {schema}.{tmp_name} TO {schema}.{name}"),
)
.await
}
}
async fn table_exists(client: &Client, asset: &Asset) -> Result<bool, ExecutorError> {
let schema = quote_ident(asset.reference().schema());
let name = quote_ident(asset.reference().name());
client
.query(&format!("EXISTS TABLE {schema}.{name}"))
.fetch_one::<u8>()
.await
.map(|exists| exists == 1)
.map_err(|e| ExecutorError::Run {
asset: asset.to_string(),
message: e.to_string(),
})
}
fn header(asset: &Asset, key: &str) -> Option<String> {
asset
.header(key)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
}
async fn exec(client: &Client, asset: &Asset, sql: &str) -> Result<(), ExecutorError> {
client
.query(sql)
.execute()
.await
.map_err(|e| ExecutorError::Run {
asset: asset.to_string(),
message: e.to_string(),
})
}
async fn drop_any(client: &Client, asset: &Asset) -> Result<(), ExecutorError> {
let schema = quote_ident(asset.reference().schema());
let name = quote_ident(asset.reference().name());
let sql = format!("DROP TABLE IF EXISTS {schema}.{name}");
exec(client, asset, &sql).await
}
fn quote_ident(value: &str) -> String {
let mut quoted = String::with_capacity(value.len() + 2);
quoted.push('`');
for ch in value.chars() {
if matches!(ch, '\\' | '`') {
quoted.push('\\');
}
quoted.push(ch);
}
quoted.push('`');
quoted
}
fn temporary_name(default_database: Option<&str>, asset: &Asset) -> String {
format!(
"{}__hinge_tmp__{}",
asset.reference().name(),
object_suffix(default_database, asset)
)
}
fn old_name(default_database: Option<&str>, asset: &Asset) -> String {
format!(
"{}__hinge_old__{}",
asset.reference().name(),
object_suffix(default_database, asset)
)
}
fn object_suffix(default_database: Option<&str>, asset: &Asset) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut h = DefaultHasher::new();
default_database.hash(&mut h);
asset.reference().schema().hash(&mut h);
asset.reference().name().hash(&mut h);
h.finish()
}
#[cfg(test)]
mod tests {
use super::{ClickHouseOptions, quote_ident};
use crate::{Asset, AssetKind, AssetReference, AssetSource};
use std::collections::BTreeMap;
#[test]
fn quote_ident_escapes_backticks_and_backslashes() {
assert_eq!(quote_ident(r#"a\b`c"#), r#"`a\\b\`c`"#);
}
#[test]
fn options_require_order_by_for_tables() {
let asset = Asset::new(
AssetKind::Table,
AssetReference::new("mart", "events"),
AssetSource::new("SELECT 1").unwrap(),
);
assert!(ClickHouseOptions::from_asset(&asset, true).is_err());
assert!(ClickHouseOptions::from_asset(&asset, false).is_ok());
}
#[test]
fn options_render_merge_tree_clauses() {
let mut headers = BTreeMap::new();
headers.insert(
"clickhouse.order_by".to_string(),
"(event_date, user_id)".to_string(),
);
headers.insert(
"clickhouse.partition_by".to_string(),
"toYYYYMM(event_date)".to_string(),
);
headers.insert("clickhouse.primary_key".to_string(), "user_id".to_string());
headers.insert(
"clickhouse.settings".to_string(),
"index_granularity = 8192".to_string(),
);
let asset = Asset::with_headers(
AssetKind::Table,
AssetReference::new("mart", "events"),
AssetSource::new("SELECT 1").unwrap(),
headers,
);
let options = ClickHouseOptions::from_asset(&asset, true).unwrap();
assert_eq!(
options.table_clause(),
"ENGINE = MergeTree PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, user_id) PRIMARY KEY user_id SETTINGS index_granularity = 8192"
);
}
}