mod application;
mod domain;
mod infrastructure;
pub use application::{
BuildGraph,
BuildGraphError,
Engine,
Executor,
ExecutorError,
RunAll,
RunBidirectional,
RunDownstream,
RunUpstream,
};
pub use domain::{
Asset,
AssetError,
AssetKind,
AssetReference,
AssetSource,
Graph,
GraphError,
};
pub use infrastructure::{
ClickHouseExecutor,
DuckDbConnectionError, DuckDbExecutor,
PostgresExecutor,
SnowflakeConnectionError, SnowflakeExecutor,
};
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
fn asset(schema: &str, name: &str) -> Asset {
Asset::new(
AssetKind::View,
AssetReference::new(schema, name),
AssetSource::new("SELECT 1").unwrap(),
)
}
fn build_graph() -> (Graph, Asset, Asset, Asset, Asset) {
let raw = asset("public", "raw");
let staging = asset("public", "staging");
let mart_a = asset("public", "mart_a");
let mart_b = asset("public", "mart_b");
let mut edges: HashMap<Asset, Vec<Asset>> = HashMap::new();
edges.insert(raw.clone(), vec![staging.clone()]);
edges.insert(staging.clone(), vec![mart_a.clone(), mart_b.clone()]);
edges.insert(mart_a.clone(), vec![]);
edges.insert(mart_b.clone(), vec![]);
let graph = Graph::build(edges).unwrap();
(graph, raw, staging, mart_a, mart_b)
}
#[derive(Clone)]
struct MockExecutor {
calls: Arc<Mutex<Vec<String>>>,
}
impl MockExecutor {
fn with_log(calls: Arc<Mutex<Vec<String>>>) -> Self {
Self { calls }
}
}
#[async_trait]
impl Executor for MockExecutor {
async fn run(&self, asset: &Asset) -> Result<(), ExecutorError> {
self.calls.lock().unwrap().push(asset.to_string());
Ok(())
}
}
fn write_sql(base: &std::path::Path, schema: &str, model: &str, sql: &str) {
let dir = base.join(schema);
std::fs::create_dir_all(&dir).unwrap();
std::fs::write(dir.join(format!("{model}.sql")), sql).unwrap();
}
#[test]
fn build_graph_resolves_deps() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
write_sql(root, "raw", "events", "SELECT 1");
write_sql(root, "staging", "orders", "SELECT id FROM raw.events");
write_sql(root, "mart", "revenue", "SELECT sum FROM staging.orders");
let graph = BuildGraph::from_dir(root).unwrap();
let order = graph.topo_sort();
let pos = |s: &str| {
order.iter().position(|a| a.to_string() == s).unwrap()
};
assert_eq!(order.len(), 3);
assert!(pos("raw.events") < pos("staging.orders"), "events before orders");
assert!(pos("staging.orders") < pos("mart.revenue"), "orders before revenue");
}
#[test]
fn build_graph_kind_header() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
write_sql(root, "mart", "sales",
"-- @kind: materialized_view\nSELECT 1");
let graph = BuildGraph::from_dir(root).unwrap();
let assets: Vec<_> = graph.topo_sort();
assert_eq!(assets[0].kind(), AssetKind::MaterializedView);
}
#[test]
fn build_graph_preserves_metadata_headers() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
write_sql(root, "mart", "events",
"-- @kind: table\n-- @clickhouse.order_by: (event_date, user_id)\nSELECT 1");
let graph = BuildGraph::from_dir(root).unwrap();
let assets: Vec<_> = graph.topo_sort();
assert_eq!(
assets[0].header("clickhouse.order_by"),
Some("(event_date, user_id)")
);
}
#[test]
fn build_graph_ignores_external_refs() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
write_sql(root, "staging", "orders",
"SELECT id FROM raw_db.source");
let graph = BuildGraph::from_dir(root).unwrap();
assert_eq!(graph.topo_sort().len(), 1);
}
#[test]
fn build_graph_deduplicates_deps() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
write_sql(root, "raw", "events", "SELECT 1");
write_sql(root, "staging", "orders",
"SELECT a.id, b.val FROM raw.events a JOIN raw.events b ON a.id = b.id");
let graph = BuildGraph::from_dir(root).unwrap();
assert_eq!(graph.topo_sort().len(), 2);
}
#[test]
fn topo_sort_dependencies_before_dependents() {
let (graph, raw, staging, mart_a, mart_b) = build_graph();
let order = graph.topo_sort();
let pos = |a: &Asset| order.iter().position(|x| x == a).unwrap();
assert!(pos(&raw) < pos(&staging), "raw must come before staging");
assert!(pos(&staging) < pos(&mart_a), "staging must come before mart_a");
assert!(pos(&staging) < pos(&mart_b), "staging must come before mart_b");
assert_eq!(order.len(), 4);
}
#[tokio::test]
async fn run_all_executes_in_topo_order() {
let (graph, ..) = build_graph();
let calls = Arc::new(Mutex::new(vec![]));
let use_case = RunAll::new(graph, MockExecutor::with_log(calls.clone()));
use_case.execute().await.unwrap();
let order = calls.lock().unwrap().clone();
assert_eq!(order.len(), 4);
let pos = |s: &str| order.iter().position(|x| x == s).unwrap();
assert!(pos("public.raw") < pos("public.staging"));
assert!(pos("public.staging") < pos("public.mart_a"));
assert!(pos("public.staging") < pos("public.mart_b"));
let plan = RunAll::new(
build_graph().0,
MockExecutor::with_log(Arc::new(Mutex::new(vec![]))),
).plan();
assert_eq!(plan.len(), 4);
}
#[tokio::test]
async fn run_upstream_of_mart_a() {
let (graph, _raw, _staging, mart_a, _mart_b) = build_graph();
let calls = Arc::new(Mutex::new(vec![]));
let use_case = RunUpstream::new(graph, MockExecutor::with_log(calls.clone()));
use_case.execute(&mart_a).await.unwrap();
let order = calls.lock().unwrap().clone();
assert_eq!(order.len(), 3);
assert!(!order.contains(&"public.mart_b".to_string()));
let pos = |s: &str| order.iter().position(|x| x == s).unwrap();
assert!(pos("public.raw") < pos("public.staging"));
assert!(pos("public.staging") < pos("public.mart_a"));
assert_eq!(order.last().unwrap(), "public.mart_a");
}
#[tokio::test]
async fn run_downstream_from_staging() {
let (graph, _raw, staging, _mart_a, _mart_b) = build_graph();
let calls = Arc::new(Mutex::new(vec![]));
let use_case = RunDownstream::new(graph, MockExecutor::with_log(calls.clone()));
use_case.execute(&staging).await.unwrap();
let order = calls.lock().unwrap().clone();
assert_eq!(order.len(), 3);
assert!(!order.contains(&"public.raw".to_string()));
assert_eq!(order[0], "public.staging");
let pos = |s: &str| order.iter().position(|x| x == s).unwrap();
assert!(pos("public.staging") < pos("public.mart_a"));
assert!(pos("public.staging") < pos("public.mart_b"));
}
#[tokio::test]
async fn run_bidirectional_from_staging() {
let (graph, _raw, staging, _mart_a, _mart_b) = build_graph();
let calls = Arc::new(Mutex::new(vec![]));
let use_case = RunBidirectional::new(graph, MockExecutor::with_log(calls.clone()));
use_case.execute(&staging).await.unwrap();
let order = calls.lock().unwrap().clone();
assert_eq!(order.len(), 4);
let pos = |s: &str| order.iter().position(|x| x == s).unwrap();
assert!(pos("public.raw") < pos("public.staging"), "raw before staging");
assert!(pos("public.staging") < pos("public.mart_a"), "staging before mart_a");
assert!(pos("public.staging") < pos("public.mart_b"), "staging before mart_b");
}
}