use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;
use crate::arrow::{arrow_batches_to_result_set, arrow_schema_to_columns};
use crate::config::{extract_s3_buckets, register_s3_bucket, resolve_files, resolve_mem};
use crate::iceberg::{build_iceberg_rest_catalog, resolve_iceberg_metadata, resolve_iceberg_table};
use async_recursion::async_recursion;
use async_trait::async_trait;
use aws_config::BehaviorVersion;
use datafusion::arrow::array::{Array, Int64Array, UInt64Array};
use datafusion::catalog::view::ViewTable;
use datafusion::catalog::{
CatalogProvider as DFCatalogProvider, MemoryCatalogProvider, MemorySchemaProvider,
TableProvider,
};
use datafusion::common::TableReference;
use datafusion::datasource::DefaultTableSource;
use datafusion::logical_expr::dml::InsertOp;
use datafusion::logical_expr::{ident, Expr, LogicalPlanBuilder};
use datafusion::prelude::SessionContext;
use datafusion_functions::core::expr_fn as core_fn;
use hamelin_executor::config::{
CatalogConfig, CatalogEntry, DatasetConfig, DatasetStore, ResolvedDataset,
};
use hamelin_executor::executor::{CompiledQuery, Executor, ExecutorError};
use hamelin_executor::results::ResultSet;
use hamelin_lib::catalog::{CatalogProvider, Column};
use hamelin_lib::err::{ContextualTranslationErrors, WithTranslationErrors};
use hamelin_lib::interner::Interner;
use hamelin_lib::parse_with_options;
use hamelin_lib::provider::EnvironmentProvider;
use hamelin_lib::tree::ast::expression::Expression;
use hamelin_lib::tree::ast::identifier::{CompoundIdentifier, Identifier, SimpleIdentifier};
use hamelin_lib::tree::ast::ParseWithErrors;
use hamelin_lib::tree::options::{ParseOptions, TypeCheckOptions};
use hamelin_lib::type_check_with_options;
use hamelin_translation::IRStatement;
use crate::statement::{translate_statement, TranslatedStatement};
struct ResolvedTable {
identifier: Identifier,
columns: Vec<Column>,
provider: Arc<dyn TableProvider>,
}
#[derive(Default, Clone)]
pub struct DataFusionExecutorConfig {
pub datasets: HashMap<String, HashMap<String, HashMap<String, DatasetConfig>>>,
pub catalogs: HashMap<String, CatalogEntry>,
}
impl DataFusionExecutorConfig {
pub fn with_dataset(mut self, identifier: Identifier, config: DatasetConfig) -> Self {
let segments = identifier.segments();
let (catalog, schema, table, needs_default_catalog) = match segments {
[s, t] => ("default", s.as_str(), t.as_str(), true),
[c, s, t] => (c.as_str(), s.as_str(), t.as_str(), false),
_ => return self,
};
self.datasets
.entry(catalog.to_string())
.or_default()
.entry(schema.to_string())
.or_default()
.insert(table.to_string(), config);
if needs_default_catalog && !self.catalogs.contains_key(catalog) {
self.catalogs.insert(
catalog.to_string(),
CatalogEntry {
default: true,
config: CatalogConfig::Reflection,
},
);
}
self
}
pub fn with_catalog(mut self, name: &str, entry: CatalogEntry) -> Self {
self.catalogs.insert(name.to_string(), entry);
self
}
}
pub struct DataFusionExecutor {
ctx: SessionContext,
dataset_store: DatasetStore,
}
impl DataFusionExecutor {
pub fn new() -> Self {
Self {
ctx: SessionContext::new(),
dataset_store: DatasetStore::new(HashMap::new(), HashMap::new()),
}
}
pub fn from_config(config: DataFusionExecutorConfig) -> Self {
let mut executor = Self::new();
executor.dataset_store = DatasetStore::new(config.datasets, config.catalogs);
executor
}
pub fn session_context(&self) -> &SessionContext {
&self.ctx
}
async fn register_provider(
&self,
table: Identifier,
provider: Arc<dyn TableProvider>,
) -> Result<(), ExecutorError> {
let segments = table.segments();
let table_ref = match segments {
[name] => TableReference::bare(name.as_str()),
[schema, name] => TableReference::partial(schema.as_str(), name.as_str()),
[catalog, schema, name] => {
TableReference::full(catalog.as_str(), schema.as_str(), name.as_str())
}
_ => {
return Err(ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Invalid table identifier: expected 1-3 parts, got {}",
segments.len()
)
.into(),
))
}
};
self.ensure_catalog_schema(&table_ref)?;
self.ctx
.register_table(table_ref, provider)
.map_err(|e| ExecutorError::ConfigurationError(e.into()))?;
Ok(())
}
fn ensure_catalog_schema(&self, table_ref: &TableReference) -> Result<(), ExecutorError> {
let state = self.ctx.state();
let options = state.config().options();
let default_catalog = options.catalog.default_catalog.clone();
let default_schema = options.catalog.default_schema.clone();
let (catalog_name, schema_name) = match table_ref {
TableReference::Bare { .. } => (default_catalog, default_schema),
TableReference::Partial { schema, .. } => (default_catalog, schema.to_string()),
TableReference::Full {
catalog, schema, ..
} => (catalog.to_string(), schema.to_string()),
};
let catalog_list = state.catalog_list().clone();
let catalog = catalog_list.catalog(&catalog_name).unwrap_or_else(|| {
let catalog: Arc<dyn DFCatalogProvider> = Arc::new(MemoryCatalogProvider::new());
catalog_list.register_catalog(catalog_name.clone(), Arc::clone(&catalog));
catalog
});
if catalog.schema(&schema_name).is_none() {
catalog
.register_schema(&schema_name, Arc::new(MemorySchemaProvider::new()))
.map_err(|e| ExecutorError::ConfigurationError(e.into()))?;
}
Ok(())
}
#[async_recursion]
async fn resolve_dataset(
&self,
identifier: &Identifier,
canonical: &Identifier,
config: &DatasetConfig,
views_in_progress: &mut HashSet<Identifier>,
) -> Result<(Vec<Column>, Arc<dyn TableProvider>), ExecutorError> {
match config {
DatasetConfig::Files(files) => {
let s3_buckets = extract_s3_buckets(&files.paths);
if !s3_buckets.is_empty() {
let sdk_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
for (scheme, bucket) in s3_buckets {
register_s3_bucket(
&self.ctx,
&sdk_config,
&scheme,
&bucket,
files.region.as_deref(),
files.role_arn.as_deref(),
&files.static_credentials,
files.endpoint_url.as_deref(),
files.force_path_style,
files.allow_http,
)
.await?;
}
}
resolve_files(files, &self.ctx).await
}
DatasetConfig::Mem(mem) => resolve_mem(&mem.columns),
DatasetConfig::Iceberg(iceberg) => {
resolve_iceberg_metadata(&iceberg.metadata_location, iceberg.region.as_deref())
.await
}
DatasetConfig::View(view) => {
if !views_in_progress.insert(canonical.clone()) {
return Err(ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Circular view dependency detected: '{}' references itself \
(directly or transitively)",
identifier
)
.into(),
));
}
let view_name = format!("view '{}'", identifier);
let translated = self
.compile_query_helper(&view.query, None, Some(&view_name), views_in_progress)
.await?;
views_in_progress.remove(canonical);
match translated {
TranslatedStatement::Query {
plan,
output_schema,
} => {
let provider: Arc<dyn TableProvider> = Arc::new(ViewTable::new(plan, None));
Ok((output_schema.clone(), provider))
}
TranslatedStatement::Dml { .. } => Err(ExecutorError::ConfigurationError(
anyhow::anyhow!(
"View '{}' query must be a SELECT, not a DML statement",
identifier
)
.into(),
)),
}
}
}
}
async fn resolve_local_table(
&self,
identifier: &Identifier,
canonical: &Identifier,
dataset_config: &DatasetConfig,
views_in_progress: &mut HashSet<Identifier>,
) -> Result<Arc<dyn TableProvider>, ExecutorError> {
let segments = identifier.segments();
let table_ref = match segments {
[name] => TableReference::bare(name.as_str()),
[schema, name] => TableReference::partial(schema.as_str(), name.as_str()),
[catalog, schema, name] => {
TableReference::full(catalog.as_str(), schema.as_str(), name.as_str())
}
_ => {
return Err(ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Invalid table identifier: expected 1-3 parts, got {}",
segments.len()
)
.into(),
))
}
};
if let Ok(provider) = self.ctx.table_provider(table_ref).await {
return Ok(provider);
}
let (_columns, provider) = self
.resolve_dataset(identifier, canonical, dataset_config, views_in_progress)
.await?;
self.register_provider(identifier.clone(), provider.clone())
.await?;
Ok(provider)
}
async fn resolve_tables(
&self,
identifiers: &[Identifier],
views_in_progress: &mut HashSet<Identifier>,
) -> Result<Vec<ResolvedTable>, ExecutorError> {
let mut local_refs: Vec<(Identifier, Identifier, DatasetConfig)> = vec![];
let mut iceberg_refs: HashMap<String, Vec<(Identifier, CatalogConfig, String, String)>> =
HashMap::new();
let mut already_registered: Vec<Identifier> = vec![];
for identifier in identifiers {
let segments = identifier.segments();
let seg_strs: Vec<&str> = segments.iter().map(|s| s.as_str()).collect();
let resolved = match self.dataset_store.resolve(&seg_strs) {
Ok(r) => r,
Err(_) => {
already_registered.push(identifier.clone());
continue;
}
};
match resolved {
ResolvedDataset::Local {
catalog: cat,
schema: sch,
table: tbl,
config,
..
} => {
let canonical: Identifier = CompoundIdentifier::new(
SimpleIdentifier::new(&cat),
SimpleIdentifier::new(&sch),
vec![SimpleIdentifier::new(&tbl)],
)
.into();
local_refs.push((identifier.clone(), canonical, config));
}
ResolvedDataset::Catalog {
catalog_name,
catalog_config,
namespace,
table,
} => {
iceberg_refs.entry(catalog_name).or_default().push((
identifier.clone(),
catalog_config,
namespace,
table,
));
}
}
}
let mut results = Vec::new();
for (identifier, canonical, dataset_config) in &local_refs {
let provider = self
.resolve_local_table(identifier, canonical, dataset_config, views_in_progress)
.await?;
let columns = arrow_schema_to_columns(&provider.schema());
results.push(ResolvedTable {
identifier: identifier.clone(),
columns,
provider,
});
}
for (catalog_name, tables) in &iceberg_refs {
let (uri, warehouse, extra_properties) = match &tables[0].1 {
CatalogConfig::IcebergRest {
uri,
warehouse,
extra_properties,
} => (uri.clone(), warehouse.clone(), extra_properties.clone()),
CatalogConfig::Reflection => {
return Err(ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Catalog '{}' is configured as 'reflection', which is not supported by the DataFusion executor. \
Reflection catalogs are designed to work only with database-style executors, like Trino.",
catalog_name
)
.into(),
));
}
};
let iceberg_cat = build_iceberg_rest_catalog(
&catalog_name,
&uri,
warehouse.as_deref(),
extra_properties,
)
.await?;
let mut join_set = tokio::task::JoinSet::new();
for (identifier, _, namespace, table) in tables {
let cat = iceberg_cat.clone();
let ns = namespace.clone();
let tbl = table.clone();
let id = identifier.clone();
join_set.spawn(async move {
let (columns, provider) = resolve_iceberg_table(cat, &ns, &tbl).await?;
Ok::<_, ExecutorError>((id, columns, provider))
});
}
while let Some(result) = join_set.join_next().await {
let (identifier, columns, provider) = result
.map_err(|e| ExecutorError::ConfigurationError(anyhow::anyhow!(e).into()))??;
self.register_iceberg_provider(identifier.clone(), provider.clone())
.await?;
results.push(ResolvedTable {
identifier,
columns,
provider,
});
}
}
for identifier in &already_registered {
let segments = identifier.segments();
let table_ref = match segments {
[name] => TableReference::bare(name.as_str()),
[schema, name] => TableReference::partial(schema.as_str(), name.as_str()),
[catalog, schema, name] => {
TableReference::full(catalog.as_str(), schema.as_str(), name.as_str())
}
_ => continue,
};
if let Ok(provider) = self.ctx.table_provider(table_ref).await {
let columns = arrow_schema_to_columns(&provider.schema());
results.push(ResolvedTable {
identifier: identifier.clone(),
columns,
provider,
});
}
}
Ok(results)
}
async fn compile_to_ir(
&self,
hamelin: &str,
time_range: Option<&str>,
source_name: Option<&str>,
views_in_progress: &mut HashSet<Identifier>,
) -> Result<(IRStatement, String), ExecutorError> {
let source = hamelin.to_string();
let sn = source_name.map(|s| s.to_string());
let interner = Arc::new(Interner::new());
let WithTranslationErrors {
output: ast,
errors: parse_errors,
} = parse_with_options(
&source,
ParseOptions::builder().interner(interner.clone()).build(),
);
if !parse_errors.is_empty() {
return Err(
ContextualTranslationErrors::with_source_name(source, sn, parse_errors).into(),
);
}
let direct_identifiers = ast.datasets().map_err(|e| {
ExecutorError::from(ContextualTranslationErrors::with_source_name(
source.clone(),
sn.clone(),
e.as_ref().clone().single(),
))
})?;
let resolved = self
.resolve_tables(&direct_identifiers, views_in_progress)
.await?;
let catalog = CatalogProvider::default();
for rt in &resolved {
register_resolved(&rt.provider, &rt.identifier, &catalog)?;
}
let catalog = Arc::new(catalog);
let parsed_time_range = time_range
.map(|tr| {
Expression::parse_result(tr)
.map(|e| Arc::new(e))
.map_err(|errors| ExecutorError::from(errors.contextualize(tr)))
})
.transpose()?;
let WithTranslationErrors {
output: typed,
errors: type_errors,
} = type_check_with_options(
ast,
TypeCheckOptions::builder()
.provider(catalog.clone() as Arc<dyn EnvironmentProvider>)
.interner(interner)
.maybe_time_range(parsed_time_range)
.build(),
);
if !type_errors.is_empty() {
return Err(
ContextualTranslationErrors::with_source_name(source, sn, type_errors).into(),
);
}
let ir = hamelin_translation::normalize_with()
.with_provider(catalog)
.with_lower_transform()
.lower(typed.into())
.map_err(|e| {
ExecutorError::from(ContextualTranslationErrors::with_source_name(
source.clone(),
sn.clone(),
e,
))
})?;
Ok((ir, source))
}
pub async fn compile_query(
&self,
hamelin: &str,
time_range: Option<&str>,
) -> Result<TranslatedStatement, ExecutorError> {
self.compile_query_helper(hamelin, time_range, None, &mut HashSet::new())
.await
}
async fn compile_query_helper(
&self,
hamelin: &str,
time_range: Option<&str>,
source_name: Option<&str>,
views_in_progress: &mut HashSet<Identifier>,
) -> Result<TranslatedStatement, ExecutorError> {
let (ir, source) = self
.compile_to_ir(hamelin, time_range, source_name, views_in_progress)
.await?;
let translated = translate_statement(&ir, &self.ctx).await.map_err(|e| {
ContextualTranslationErrors::with_source_name(
source,
source_name.map(|s| s.to_string()),
e.as_ref().clone().single(),
)
})?;
Ok(translated)
}
async fn register_iceberg_provider(
&self,
table: Identifier,
provider: Arc<dyn TableProvider>,
) -> Result<(), ExecutorError> {
let segments = table.segments();
let table_ref = match segments {
[name] => TableReference::bare(name.as_str()),
[schema, name] => TableReference::partial(schema.as_str(), name.as_str()),
[catalog, schema, name] => {
TableReference::full(catalog.as_str(), schema.as_str(), name.as_str())
}
_ => {
return Err(ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Invalid table identifier: expected 1-3 parts, got {}",
segments.len()
)
.into(),
))
}
};
if self.ctx.table_provider(table_ref.clone()).await.is_ok() {
self.ctx
.deregister_table(table_ref.clone())
.map_err(|e| ExecutorError::ConfigurationError(e.into()))?;
}
self.ensure_catalog_schema(&table_ref)?;
self.ctx
.register_table(table_ref, provider)
.map_err(|e| ExecutorError::ConfigurationError(e.into()))?;
Ok(())
}
}
impl Default for DataFusionExecutor {
fn default() -> Self {
Self::new()
}
}
impl Debug for DataFusionExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DataFusionExecutor")
.field("dataset_store", &self.dataset_store)
.finish()
}
}
#[async_trait]
impl Executor for DataFusionExecutor {
async fn translate(
&self,
hamelin: &str,
time_range: Option<&str>,
) -> Result<CompiledQuery, ExecutorError> {
let translated = self.compile_query(hamelin, time_range).await?;
match translated {
TranslatedStatement::Query {
plan,
output_schema,
} => Ok(CompiledQuery {
display: format!("{}", plan.display_indent()),
columns: output_schema.clone(),
}),
TranslatedStatement::Dml {
source_plan,
target_table,
distinct_by,
} => {
let mut display = format!("INSERT INTO {}", target_table);
if !distinct_by.is_empty() {
let cols: Vec<String> = distinct_by.iter().map(|id| id.to_string()).collect();
display.push_str(&format!(" DISTINCT BY {}", cols.join(", ")));
}
display.push_str(&format!("\n{}", source_plan.display_indent()));
Ok(CompiledQuery {
display,
columns: vec![],
})
}
}
}
async fn ir(
&self,
hamelin: &str,
time_range: Option<&str>,
) -> Result<CompiledQuery, ExecutorError> {
let (ir, _) = self
.compile_to_ir(hamelin, time_range, None, &mut HashSet::new())
.await?;
let columns = ir.pipeline.output_schema.as_struct().into();
Ok(CompiledQuery {
display: ir.to_string(),
columns,
})
}
async fn execute_query(
&self,
hamelin: &str,
time_range: Option<&str>,
) -> Result<ResultSet, ExecutorError> {
let translated = self.compile_query(hamelin, time_range).await?;
let (plan, output_schema) = translated.query().ok_or_else(|| {
ExecutorError::QueryError(anyhow::anyhow!("expected query statement, got DML").into())
})?;
let df = self
.ctx
.execute_logical_plan(plan)
.await
.map_err(|e| ExecutorError::QueryError(e.into()))?;
let batches = df
.collect()
.await
.map_err(|e| ExecutorError::QueryError(e.into()))?;
arrow_batches_to_result_set(&batches, output_schema)
}
async fn execute_dml(
&self,
hamelin: &str,
time_range: Option<&str>,
) -> Result<usize, ExecutorError> {
let translated = self.compile_query(hamelin, time_range).await?;
let (source_plan, target_table, distinct_by) = translated.dml().ok_or_else(|| {
ExecutorError::QueryError(anyhow::anyhow!("expected DML statement, got query").into())
})?;
let segments = target_table.segments();
let table_name = segments
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
.join(".");
let table_ref = match segments {
[name] => TableReference::bare(name.as_str()),
[schema, name] => TableReference::partial(schema.as_str(), name.as_str()),
[catalog, schema, name] => {
TableReference::full(catalog.as_str(), schema.as_str(), name.as_str())
}
_ => {
return Err(ExecutorError::QueryError(
anyhow::anyhow!("Invalid DML target table identifier").into(),
))
}
};
let table_provider = self.ctx.table_provider(table_ref).await.map_err(|e| {
ExecutorError::QueryError(
anyhow::anyhow!("Target table '{}' not found: {}", table_name, e).into(),
)
})?;
let effective_source = if distinct_by.is_empty() {
source_plan
} else {
let target_scan = LogicalPlanBuilder::scan(
table_name.clone(),
Arc::new(DefaultTableSource::new(table_provider.clone())),
None,
)
.map_err(|e| {
ExecutorError::QueryError(
anyhow::anyhow!("Failed to scan target table for DISTINCT BY: {}", e).into(),
)
})?
.build()
.map_err(|e| {
ExecutorError::QueryError(
anyhow::anyhow!("Failed to build target scan: {}", e).into(),
)
})?;
let build_field_expr = |segments: &[SimpleIdentifier]| -> Expr {
let mut expr = ident(segments[0].as_str());
for seg in &segments[1..] {
expr = core_fn::get_field(expr, seg.as_str());
}
expr
};
let left_keys: Vec<Expr> = distinct_by
.iter()
.map(|id| build_field_expr(id.segments()))
.collect();
let right_keys: Vec<Expr> = distinct_by
.iter()
.map(|id| build_field_expr(id.segments()))
.collect();
LogicalPlanBuilder::from(source_plan)
.join_with_expr_keys(
target_scan,
datafusion::logical_expr::JoinType::LeftAnti,
(left_keys, right_keys),
None,
)
.map_err(|e| {
ExecutorError::QueryError(
anyhow::anyhow!("Failed to build anti-join for DISTINCT BY: {}", e).into(),
)
})?
.build()
.map_err(|e| {
ExecutorError::QueryError(
anyhow::anyhow!("Failed to finalize anti-join plan: {}", e).into(),
)
})?
};
let insert_plan = LogicalPlanBuilder::insert_into(
effective_source,
&table_name,
Arc::new(DefaultTableSource::new(table_provider)),
InsertOp::Append,
)
.map_err(|e| {
ExecutorError::QueryError(anyhow::anyhow!("Failed to build INSERT plan: {}", e).into())
})?
.build()
.map_err(|e| {
ExecutorError::QueryError(
anyhow::anyhow!("Failed to finalize INSERT plan: {}", e).into(),
)
})?;
let df = self
.ctx
.execute_logical_plan(insert_plan)
.await
.map_err(|e| ExecutorError::QueryError(e.into()))?;
let batches = df
.collect()
.await
.map_err(|e| ExecutorError::QueryError(e.into()))?;
if batches.is_empty() {
return Ok(0);
}
let batch = &batches[0];
if batch.num_columns() == 0 || batch.num_rows() == 0 {
return Ok(0);
}
let col = batch.column(0);
if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
if !arr.is_empty() {
return Ok(arr.value(0) as usize);
}
}
if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
if !arr.is_empty() {
return Ok(arr.value(0) as usize);
}
}
Ok(0)
}
async fn create_dataset(
&self,
_dataset: Identifier,
_columns: Vec<Column>,
) -> Result<(), ExecutorError> {
Err(ExecutorError::ConfigurationError(
anyhow::anyhow!(
"DataFusion local tables must be configured via DatasetConfig::Mem, not create_dataset"
)
.into(),
))
}
async fn resolve_datasets(
&self,
datasets: &[Identifier],
) -> Result<Vec<Vec<Column>>, ExecutorError> {
let resolved = self.resolve_tables(datasets, &mut HashSet::new()).await?;
let resolved_map: HashMap<&Identifier, &Vec<Column>> = resolved
.iter()
.map(|rt| (&rt.identifier, &rt.columns))
.collect();
let mut results = Vec::with_capacity(datasets.len());
for id in datasets {
if let Some(cols) = resolved_map.get(id) {
results.push((*cols).clone());
continue;
}
let segments = id.segments();
let table_ref = match segments {
[name] => TableReference::bare(name.as_str()),
[schema, name] => TableReference::partial(schema.as_str(), name.as_str()),
[catalog, schema, name] => {
TableReference::full(catalog.as_str(), schema.as_str(), name.as_str())
}
_ => {
return Err(ExecutorError::QueryError(
anyhow::anyhow!("Dataset '{}' not found", id).into(),
))
}
};
let provider = self.ctx.table_provider(table_ref).await.map_err(|_| {
ExecutorError::QueryError(anyhow::anyhow!("Dataset '{}' not found", id).into())
})?;
results.push(arrow_schema_to_columns(&provider.schema()));
}
Ok(results)
}
async fn reflect_catalog(&self) -> Result<hamelin_lib::catalog::Catalog, ExecutorError> {
let mut tables = HashMap::new();
match self.dataset_store.default_catalog_config() {
Some((
catalog_name,
CatalogConfig::IcebergRest {
uri,
warehouse,
extra_properties,
},
)) => {
let iceberg_cat = build_iceberg_rest_catalog(
catalog_name,
uri,
warehouse.as_deref(),
extra_properties.clone(),
)
.await?;
let namespaces = iceberg_cat
.list_namespaces(None)
.await
.map_err(|e| ExecutorError::ServerError(e.into()))?;
for ns in &namespaces {
let table_idents = iceberg_cat
.list_tables(ns)
.await
.map_err(|e| ExecutorError::ServerError(e.into()))?;
for table_ident in &table_idents {
let iceberg_table = iceberg_cat
.load_table(table_ident)
.await
.map_err(|e| ExecutorError::ServerError(e.into()))?;
let iceberg_schema = iceberg_table.metadata().current_schema();
let arrow_schema = iceberg::arrow::schema_to_arrow_schema(iceberg_schema)
.map_err(|e| ExecutorError::ServerError(e.into()))?;
let columns = arrow_schema_to_columns(&arrow_schema);
let namespace_str = ns.as_ref().join(".");
let key = format!("{}.{}", namespace_str, table_ident.name());
tables.insert(key, columns);
}
}
}
Some((_, CatalogConfig::Reflection)) => {
return Err(ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Default catalog is configured as 'reflection', which is not supported \
by the DataFusion executor. Reflection catalogs work only with \
database-style executors like Trino."
)
.into(),
));
}
None => {
}
}
Ok(hamelin_lib::catalog::Catalog(tables))
}
}
fn register_resolved(
provider: &Arc<dyn TableProvider>,
identifier: &Identifier,
catalog: &CatalogProvider,
) -> Result<(), ExecutorError> {
let columns = arrow_schema_to_columns(&provider.schema());
let cols_map = columns
.iter()
.map(|c| {
let id = c.name.parse().map_err(|e| {
ExecutorError::ConfigurationError(
anyhow::anyhow!("Invalid column name '{}': {}", c.name, e).into(),
)
})?;
let typ = c.typ.clone().try_into().map_err(|e| {
ExecutorError::ConfigurationError(
anyhow::anyhow!("Invalid column type: {}", e).into(),
)
})?;
Ok((id, typ))
})
.collect::<Result<ordermap::OrderMap<_, _>, ExecutorError>>()?;
let sql_ident = identifier.clone().into();
catalog.set(sql_ident, cols_map);
Ok(())
}