use crate::{Asset, AssetKind, ExecutorError};
use snowflake_api::SnowflakeApi;
use std::fmt::Write;
pub async fn run_asset(api: &SnowflakeApi, asset: &Asset) -> Result<(), ExecutorError> {
let sql = build_ddl(asset)?;
api.exec(&sql)
.await
.map(|_| ())
.map_err(|e| ExecutorError::Run {
asset: asset.to_string(),
message: e.to_string(),
})
}
fn build_ddl(asset: &Asset) -> Result<String, ExecutorError> {
let schema = quote_ident(asset.reference().schema());
let name = quote_ident(asset.reference().name());
let source = asset.source().as_str();
let sql = match asset.kind() {
AssetKind::View => {
format!("CREATE OR REPLACE VIEW {schema}.{name} AS\n{source}")
}
AssetKind::Table => {
let mut sql = format!("CREATE OR REPLACE TABLE {schema}.{name}");
if let Some(cluster_by) = asset.header("snowflake.cluster_by") {
write!(sql, " CLUSTER BY ({cluster_by})").unwrap();
}
write!(sql, " AS\n{source}").unwrap();
sql
}
AssetKind::MaterializedView => {
format!("CREATE OR REPLACE MATERIALIZED VIEW {schema}.{name} AS\n{source}")
}
};
Ok(sql)
}
fn quote_ident(value: &str) -> String {
format!("\"{}\"", value.replace('"', "\"\""))
}
#[cfg(test)]
mod tests {
use super::{build_ddl, quote_ident};
use crate::{Asset, AssetKind, AssetReference, AssetSource};
use std::collections::BTreeMap;
fn asset(kind: AssetKind, schema: &str, name: &str) -> Asset {
Asset::new(
kind,
AssetReference::new(schema, name),
AssetSource::new("SELECT 1").unwrap(),
)
}
#[test]
fn quote_ident_escapes_double_quotes() {
assert_eq!(quote_ident(r#"a"b"#), r#""a""b""#);
}
#[test]
fn build_ddl_view() {
let sql = build_ddl(&asset(AssetKind::View, "staging", "orders")).unwrap();
assert!(sql.starts_with("CREATE OR REPLACE VIEW \"staging\".\"orders\" AS"));
}
#[test]
fn build_ddl_table_without_cluster() {
let sql = build_ddl(&asset(AssetKind::Table, "mart", "revenue")).unwrap();
assert!(sql.starts_with("CREATE OR REPLACE TABLE \"mart\".\"revenue\" AS"));
assert!(!sql.contains("CLUSTER BY"));
}
#[test]
fn build_ddl_table_with_cluster() {
let mut headers = BTreeMap::new();
headers.insert("snowflake.cluster_by".to_string(), "event_date, user_id".to_string());
let a = Asset::with_headers(
AssetKind::Table,
AssetReference::new("mart", "events"),
AssetSource::new("SELECT 1").unwrap(),
headers,
);
let sql = build_ddl(&a).unwrap();
assert!(sql.contains("CLUSTER BY (event_date, user_id)"));
}
#[test]
fn build_ddl_materialized_view() {
let sql = build_ddl(&asset(AssetKind::MaterializedView, "mart", "mv")).unwrap();
assert!(sql.starts_with("CREATE OR REPLACE MATERIALIZED VIEW"));
}
}