use crate::{Asset, AssetKind, ExecutorError};
use duckdb::Connection;
use std::sync::{Arc, Mutex};
pub async fn run_asset(
conn: Arc<Mutex<Connection>>,
asset: &Asset,
) -> Result<(), ExecutorError> {
let batch = build_batch(asset);
let label = asset.to_string();
tokio::task::spawn_blocking(move || {
let db = conn.lock().unwrap_or_else(|e| e.into_inner());
db.execute_batch(&batch).map_err(|e| e.to_string())
})
.await
.map_err(|e| ExecutorError::Run {
asset: label.clone(),
message: format!("blocking task panicked: {e}"),
})?
.map_err(|message| ExecutorError::Run { asset: label, message })
}
fn build_batch(asset: &Asset) -> String {
let schema_raw = asset.reference().schema();
let schema = quote_ident(schema_raw);
let name = quote_ident(asset.reference().name());
let source = asset.source().as_str();
let ensure_schema = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
let create = match asset.kind() {
AssetKind::View => {
format!("CREATE OR REPLACE VIEW {schema}.{name} AS\n{source}")
}
AssetKind::Table => {
format!("CREATE OR REPLACE TABLE {schema}.{name} AS\n{source}")
}
AssetKind::MaterializedView => {
tracing::debug!(
"DuckDB: materialised_view '{}' will be created as a table",
asset
);
format!("CREATE OR REPLACE TABLE {schema}.{name} AS\n{source}")
}
};
format!("{ensure_schema};\n{create}")
}
fn quote_ident(value: &str) -> String {
format!("\"{}\"", value.replace('"', "\"\""))
}
#[cfg(test)]
mod tests {
use super::{build_batch, quote_ident};
use crate::{Asset, AssetKind, AssetReference, AssetSource};
fn asset(kind: AssetKind, schema: &str, name: &str) -> Asset {
Asset::new(
kind,
AssetReference::new(schema, name),
AssetSource::new("SELECT 1 AS n").unwrap(),
)
}
#[test]
fn quote_ident_escapes_double_quotes() {
assert_eq!(quote_ident(r#"my"schema"#), r#""my""schema""#);
}
#[test]
fn build_batch_view() {
let sql = build_batch(&asset(AssetKind::View, "staging", "orders"));
assert!(sql.contains("CREATE SCHEMA IF NOT EXISTS \"staging\""));
assert!(sql.contains("CREATE OR REPLACE VIEW \"staging\".\"orders\" AS"));
assert!(!sql.contains("TABLE"));
}
#[test]
fn build_batch_table() {
let sql = build_batch(&asset(AssetKind::Table, "mart", "revenue"));
assert!(sql.contains("CREATE OR REPLACE TABLE \"mart\".\"revenue\" AS"));
}
#[test]
fn build_batch_materialized_view_maps_to_table() {
let sql = build_batch(&asset(AssetKind::MaterializedView, "mart", "mv"));
assert!(sql.contains("CREATE OR REPLACE TABLE"));
assert!(!sql.contains("MATERIALIZED VIEW"));
}
}