iceberg_datafusion/table/
table_provider_factory.rs1use std::borrow::Cow;
19use std::collections::HashMap;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use datafusion::catalog::{Session, TableProvider, TableProviderFactory};
24use datafusion::error::Result as DFResult;
25use datafusion::logical_expr::CreateExternalTable;
26use datafusion::sql::TableReference;
27use iceberg::arrow::schema_to_arrow_schema;
28use iceberg::io::FileIO;
29use iceberg::table::StaticTable;
30use iceberg::{Error, ErrorKind, Result, TableIdent};
31
32use super::IcebergTableProvider;
33use crate::to_datafusion_error;
34
35#[derive(Debug, Default)]
101pub struct IcebergTableProviderFactory {}
102
103impl IcebergTableProviderFactory {
104 pub fn new() -> Self {
105 Self {}
106 }
107}
108
109#[async_trait]
110impl TableProviderFactory for IcebergTableProviderFactory {
111 async fn create(
112 &self,
113 _state: &dyn Session,
114 cmd: &CreateExternalTable,
115 ) -> DFResult<Arc<dyn TableProvider>> {
116 check_cmd(cmd).map_err(to_datafusion_error)?;
117
118 let table_name = &cmd.name;
119 let metadata_file_path = &cmd.location;
120 let options = &cmd.options;
121
122 let table_name_with_ns = complement_namespace_if_necessary(table_name);
123
124 let table = create_static_table(table_name_with_ns, metadata_file_path, options)
125 .await
126 .map_err(to_datafusion_error)?
127 .into_table();
128
129 let schema = schema_to_arrow_schema(table.metadata().current_schema())
130 .map_err(to_datafusion_error)?;
131
132 Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema))))
133 }
134}
135
136fn check_cmd(cmd: &CreateExternalTable) -> Result<()> {
137 let CreateExternalTable {
138 schema,
139 table_partition_cols,
140 order_exprs,
141 constraints,
142 column_defaults,
143 ..
144 } = cmd;
145
146 let is_invalid = !schema.fields().is_empty()
148 || !table_partition_cols.is_empty()
149 || !order_exprs.is_empty()
150 || !constraints.is_empty()
151 || !column_defaults.is_empty();
152
153 if is_invalid {
154 return Err(Error::new(
155 ErrorKind::FeatureUnsupported,
156 "Currently we only support reading existing icebergs tables in external table command. To create new table, please use catalog provider.",
157 ));
158 }
159
160 Ok(())
161}
162
163fn complement_namespace_if_necessary(table_name: &TableReference) -> Cow<'_, TableReference> {
174 match table_name {
175 TableReference::Bare { table } => {
176 Cow::Owned(TableReference::partial("default", table.as_ref()))
177 }
178 other => Cow::Borrowed(other),
179 }
180}
181
182async fn create_static_table(
183 table_name: Cow<'_, TableReference>,
184 metadata_file_path: &str,
185 props: &HashMap<String, String>,
186) -> Result<StaticTable> {
187 let table_ident = TableIdent::from_strs(table_name.to_vec())?;
188 let file_io = FileIO::from_path(metadata_file_path)?
189 .with_props(props)
190 .build()?;
191 StaticTable::from_metadata_file(metadata_file_path, table_ident, file_io).await
192}
193
194#[cfg(test)]
195mod tests {
196
197 use datafusion::arrow::datatypes::{DataType, Field, Schema};
198 use datafusion::catalog::TableProviderFactory;
199 use datafusion::common::{Constraints, DFSchema};
200 use datafusion::execution::session_state::SessionStateBuilder;
201 use datafusion::logical_expr::CreateExternalTable;
202 use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
203 use datafusion::prelude::SessionContext;
204 use datafusion::sql::TableReference;
205
206 use super::*;
207
208 fn table_metadata_v2_schema() -> Schema {
209 Schema::new(vec![
210 Field::new("x", DataType::Int64, false).with_metadata(HashMap::from([(
211 PARQUET_FIELD_ID_META_KEY.to_string(),
212 "1".to_string(),
213 )])),
214 Field::new("y", DataType::Int64, false).with_metadata(HashMap::from([(
215 PARQUET_FIELD_ID_META_KEY.to_string(),
216 "2".to_string(),
217 )])),
218 Field::new("z", DataType::Int64, false).with_metadata(HashMap::from([(
219 PARQUET_FIELD_ID_META_KEY.to_string(),
220 "3".to_string(),
221 )])),
222 ])
223 }
224
225 fn table_metadata_location() -> String {
226 format!(
227 "{}/testdata/table_metadata/{}",
228 env!("CARGO_MANIFEST_DIR"),
229 "TableMetadataV2.json"
230 )
231 }
232
233 fn create_external_table_cmd() -> CreateExternalTable {
234 let metadata_file_path = table_metadata_location();
235
236 CreateExternalTable {
237 name: TableReference::partial("static_ns", "static_table"),
238 location: metadata_file_path,
239 schema: Arc::new(DFSchema::empty()),
240 file_type: "iceberg".to_string(),
241 options: Default::default(),
242 table_partition_cols: Default::default(),
243 order_exprs: Default::default(),
244 constraints: Constraints::empty(),
245 column_defaults: Default::default(),
246 if_not_exists: Default::default(),
247 temporary: false,
248 definition: Default::default(),
249 unbounded: Default::default(),
250 }
251 }
252
253 #[tokio::test]
254 async fn test_schema_of_created_table() {
255 let factory = IcebergTableProviderFactory::new();
256
257 let state = SessionStateBuilder::new().build();
258 let cmd = create_external_table_cmd();
259
260 let table_provider = factory
261 .create(&state, &cmd)
262 .await
263 .expect("create table failed");
264
265 let expected_schema = table_metadata_v2_schema();
266 let actual_schema = table_provider.schema();
267
268 assert_eq!(actual_schema.as_ref(), &expected_schema);
269 }
270
271 #[tokio::test]
272 async fn test_schema_of_created_external_table_sql() {
273 let mut state = SessionStateBuilder::new().with_default_features().build();
274 state.table_factories_mut().insert(
275 "ICEBERG".to_string(),
276 Arc::new(IcebergTableProviderFactory::new()),
277 );
278 let ctx = SessionContext::new_with_state(state);
279
280 let table_ref = TableReference::bare("static_table");
283
284 let sql = format!(
286 "CREATE EXTERNAL TABLE {} STORED AS ICEBERG LOCATION '{}'",
287 table_ref,
288 table_metadata_location()
289 );
290 let _df = ctx.sql(&sql).await.expect("create table failed");
291
292 let table_provider = ctx
294 .table_provider(table_ref)
295 .await
296 .expect("table not found");
297
298 let expected_schema = table_metadata_v2_schema();
300 let actual_schema = table_provider.schema();
301
302 assert_eq!(actual_schema.as_ref(), &expected_schema);
303 }
304}