mod executor;
mod utils;
pub use executor::{DuckDbConnectionError, DuckDbExecutor};
use crate::application::{Executor, ExecutorError};
use crate::domain::Asset;
use async_trait::async_trait;
use std::time::Instant;
use super::utils::{fmt_duration, SLOW_THRESHOLD};
use utils::run_asset;
#[async_trait]
impl Executor for DuckDbExecutor {
async fn run(&self, asset: &Asset) -> Result<(), ExecutorError> {
tracing::debug!(asset = %asset, kind = %asset.kind(), "starting");
let start = Instant::now();
run_asset(self.conn(), asset).await?;
let elapsed = start.elapsed();
if elapsed >= SLOW_THRESHOLD {
tracing::warn!(
asset = %asset,
kind = %asset.kind(),
elapsed = %fmt_duration(elapsed),
threshold = %fmt_duration(SLOW_THRESHOLD),
"slow query"
);
} else {
tracing::info!(
asset = %asset,
kind = %asset.kind(),
elapsed = %fmt_duration(elapsed),
"done"
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::DuckDbExecutor;
use crate::{Asset, AssetKind, AssetReference, AssetSource, Executor};
fn asset(kind: AssetKind, schema: &str, name: &str, sql: &str) -> Asset {
Asset::new(
kind,
AssetReference::new(schema, name),
AssetSource::new(sql).unwrap(),
)
}
#[tokio::test]
async fn run_creates_view_in_memory() {
let executor = DuckDbExecutor::from_url("duckdb://:memory:").unwrap();
let a = asset(AssetKind::View, "staging", "numbers", "SELECT 1 AS n");
executor.run(&a).await.expect("should create view");
}
#[tokio::test]
async fn run_creates_table_in_memory() {
let executor = DuckDbExecutor::from_url("duckdb://:memory:").unwrap();
let a = asset(AssetKind::Table, "mart", "totals", "SELECT 42 AS total");
executor.run(&a).await.expect("should create table");
}
#[tokio::test]
async fn run_materialized_view_as_table() {
let executor = DuckDbExecutor::from_url("duckdb://:memory:").unwrap();
let a = asset(AssetKind::MaterializedView, "mart", "mv_sales", "SELECT 1 AS sale");
executor.run(&a).await.expect("materialized view should run as table");
}
#[tokio::test]
async fn run_respects_dependency_order() {
let executor = DuckDbExecutor::from_url("duckdb://:memory:").unwrap();
let raw = asset(AssetKind::Table, "raw", "events", "SELECT 1 AS id");
let derived = asset(
AssetKind::View,
"staging",
"events",
"SELECT id FROM raw.events",
);
executor.run(&raw).await.expect("raw.events");
executor.run(&derived).await.expect("staging.events");
}
}