#![cfg(feature = "iceberg-datafusion")]
pub mod iceberg_scan {
use std::sync::Arc;
use datafusion::catalog::TableProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::execution::SessionStateBuilder;
use futures::TryStreamExt;
use iceberg::table::Table;
pub async fn iceberg_table_provider(table: Table) -> DfResult<Arc<dyn TableProvider>> {
let file_paths = collect_file_paths(&table).await?;
listing_provider_from_paths(file_paths).await
}
pub async fn iceberg_table_provider_at_snapshot(
table: Table,
snapshot_id: i64,
) -> DfResult<Arc<dyn TableProvider>> {
let scan = table
.scan()
.snapshot_id(snapshot_id)
.build()
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let task_stream = scan
.plan_files()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let tasks: Vec<iceberg::scan::FileScanTask> = task_stream
.try_collect()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let file_paths: Vec<String> = tasks
.iter()
.map(|t| local_path(t.data_file_path()))
.collect();
listing_provider_from_paths(file_paths).await
}
async fn collect_file_paths(table: &Table) -> DfResult<Vec<String>> {
let scan = table
.scan()
.build()
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let task_stream = scan
.plan_files()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let tasks: Vec<iceberg::scan::FileScanTask> = task_stream
.try_collect()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(tasks
.iter()
.map(|t| local_path(t.data_file_path()))
.collect())
}
fn local_path(uri: &str) -> String {
if let Some(p) = uri.strip_prefix("file://") {
p.to_string()
} else {
uri.to_string()
}
}
async fn listing_provider_from_paths(paths: Vec<String>) -> DfResult<Arc<dyn TableProvider>> {
if paths.is_empty() {
use arrow::datatypes::{Schema, SchemaRef};
use datafusion::datasource::MemTable;
let empty_schema: SchemaRef = Arc::new(Schema::empty());
return Ok(Arc::new(MemTable::try_new(empty_schema, vec![vec![]])?));
}
let listing_url = ListingTableUrl::parse(&paths[0])?;
let format = Arc::new(ParquetFormat::default().with_enable_pruning(true));
let listing_options = ListingOptions::new(format)
.with_file_extension(".parquet")
.with_collect_stat(true);
let state = SessionStateBuilder::new().with_default_features().build();
let schema = listing_options.infer_schema(&state, &listing_url).await?;
let config = ListingTableConfig::new(listing_url)
.with_listing_options(listing_options)
.with_schema(schema);
Ok(Arc::new(ListingTable::try_new(config)?))
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use iceberg::io::LocalFsStorageFactory;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation};
use super::iceberg_scan::iceberg_table_provider;
#[tokio::test]
async fn iceberg_table_provider_exposes_schema() {
let dir = tempfile::tempdir().unwrap();
let warehouse = url::Url::from_file_path(dir.path()).unwrap().to_string();
let catalog = MemoryCatalogBuilder::default()
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load(
"mem",
HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse)]),
)
.await
.unwrap();
let namespace = NamespaceIdent::new("ns".to_string());
catalog
.create_namespace(&namespace, HashMap::new())
.await
.unwrap();
let schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![Arc::new(NestedField::required(
1,
"id",
Type::Primitive(PrimitiveType::Long),
))])
.build()
.unwrap();
let table = catalog
.create_table(
&namespace,
TableCreation::builder()
.name("t".to_string())
.schema(schema)
.build(),
)
.await
.unwrap();
let provider = iceberg_table_provider(table).await.unwrap();
let _ = datafusion::catalog::TableProvider::schema(&*provider);
}
}