use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use hamelin_executor::executor::ExecutorError;
use hamelin_lib::catalog::Column;
use iceberg::io::{FileIOBuilder, LocalFsStorageFactory, StorageFactory};
use iceberg::table::StaticTable;
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent};
use iceberg_catalog_rest::RestCatalogBuilder;
use iceberg_datafusion::{IcebergStaticTableProvider, IcebergTableProvider};
use iceberg_storage_opendal::OpenDalStorageFactory;
use crate::arrow::arrow_schema_to_columns;
pub async fn build_iceberg_rest_catalog(
name: &str,
uri: &str,
warehouse: Option<&str>,
extra_properties: HashMap<String, String>,
) -> Result<Arc<dyn Catalog>, ExecutorError> {
let mut props = HashMap::from([("uri".to_string(), uri.to_string())]);
if let Some(wh) = warehouse {
props.insert("warehouse".to_string(), wh.to_string());
}
props.extend(
extra_properties
.into_iter()
.filter(|(k, _)| k != "uri" && k != "warehouse"),
);
let storage_factory: Arc<dyn StorageFactory> = Arc::new(OpenDalStorageFactory::S3 {
configured_scheme: "s3".to_string(),
customized_credential_load: None,
});
let catalog = RestCatalogBuilder::default()
.with_storage_factory(storage_factory)
.load(name, props)
.await
.map_err(|e| {
ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Failed to connect to Iceberg REST catalog '{}': {}",
name,
e
)
.into(),
)
})?;
Ok(Arc::new(catalog))
}
#[async_trait]
pub trait IcebergTableResolver: Send + Sync {
async fn resolve_table(
&self,
catalog: Arc<dyn Catalog>,
namespace: &str,
table_name: &str,
) -> Result<Arc<dyn TableProvider>, ExecutorError>;
}
#[derive(Debug, Clone, Default)]
pub struct DefaultIcebergTableResolver {}
#[async_trait]
impl IcebergTableResolver for DefaultIcebergTableResolver {
async fn resolve_table(
&self,
catalog: Arc<dyn Catalog>,
namespace: &str,
table_name: &str,
) -> Result<Arc<dyn TableProvider>, ExecutorError> {
let namespace_ident = NamespaceIdent::new(namespace.to_string());
let provider =
IcebergTableProvider::try_new(catalog, namespace_ident, table_name.to_string())
.await
.map_err(|e| {
ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Failed to load Iceberg table '{}.{}': {}",
namespace,
table_name,
e
)
.into(),
)
})?;
Ok(Arc::new(provider))
}
}
pub async fn resolve_iceberg_metadata(
metadata_location: &str,
region: Option<&str>,
) -> Result<(Vec<Column>, Arc<dyn TableProvider>), ExecutorError> {
let (factory, normalized_location): (Arc<dyn StorageFactory>, Cow<str>) = if metadata_location
.starts_with("s3://")
|| metadata_location.starts_with("s3a://")
{
let location = metadata_location
.strip_prefix("s3a://")
.map(|rest| Cow::Owned(format!("s3://{}", rest)))
.unwrap_or(Cow::Borrowed(metadata_location));
(
Arc::new(OpenDalStorageFactory::S3 {
configured_scheme: "s3".to_string(),
customized_credential_load: None,
}),
location,
)
} else if metadata_location.starts_with("gs://")
|| metadata_location.starts_with("oss://")
|| metadata_location.starts_with("abfs://")
|| metadata_location.starts_with("abfss://")
{
return Err(ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Unsupported storage scheme in '{}'; only s3:// and local paths are currently supported",
metadata_location
)
.into(),
));
} else {
(
Arc::new(LocalFsStorageFactory),
Cow::Borrowed(metadata_location),
)
};
let mut file_io_builder = FileIOBuilder::new(factory);
if let Some(r) = region {
file_io_builder = file_io_builder.with_prop("s3.region", r);
}
let file_io = file_io_builder.build();
let table_ident = TableIdent::new(
NamespaceIdent::new("default".to_string()),
"iceberg_table".to_string(),
);
let static_table = StaticTable::from_metadata_file(&normalized_location, table_ident, file_io)
.await
.map_err(|e| {
ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Failed to load Iceberg table from metadata '{}': {}",
metadata_location,
e
)
.into(),
)
})?;
let table = static_table.into_table();
let provider = IcebergStaticTableProvider::try_new_from_table(table)
.await
.map_err(|e| {
ExecutorError::ConfigurationError(
anyhow::anyhow!(
"Failed to create table provider from metadata '{}': {}",
metadata_location,
e
)
.into(),
)
})?;
let schema = provider.schema();
let columns = arrow_schema_to_columns(&schema);
Ok((columns, Arc::new(provider)))
}