iceberg_datafusion/table/
mod.rs1pub mod metadata_table;
19pub mod table_provider_factory;
20
21use std::any::Any;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
26use datafusion::catalog::Session;
27use datafusion::common::DataFusionError;
28use datafusion::datasource::{TableProvider, TableType};
29use datafusion::error::Result as DFResult;
30use datafusion::logical_expr::dml::InsertOp;
31use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
32use datafusion::physical_plan::ExecutionPlan;
33use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
34use iceberg::arrow::schema_to_arrow_schema;
35use iceberg::inspect::MetadataTableType;
36use iceberg::table::Table;
37use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
38use metadata_table::IcebergMetadataTableProvider;
39
40use crate::physical_plan::commit::IcebergCommitExec;
41use crate::physical_plan::scan::IcebergTableScan;
42use crate::physical_plan::write::IcebergWriteExec;
43
44#[derive(Debug, Clone)]
47pub struct IcebergTableProvider {
48 table: Table,
50 snapshot_id: Option<i64>,
52 schema: ArrowSchemaRef,
54 catalog: Option<Arc<dyn Catalog>>,
56}
57
58impl IcebergTableProvider {
59 pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
60 IcebergTableProvider {
61 table,
62 snapshot_id: None,
63 schema,
64 catalog: None,
65 }
66 }
67 pub(crate) async fn try_new(
71 client: Arc<dyn Catalog>,
72 namespace: NamespaceIdent,
73 name: impl Into<String>,
74 ) -> Result<Self> {
75 let ident = TableIdent::new(namespace, name.into());
76 let table = client.load_table(&ident).await?;
77
78 let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
79
80 Ok(IcebergTableProvider {
81 table,
82 snapshot_id: None,
83 schema,
84 catalog: Some(client),
85 })
86 }
87
88 pub async fn try_new_from_table(table: Table) -> Result<Self> {
91 let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
92 Ok(IcebergTableProvider {
93 table,
94 snapshot_id: None,
95 schema,
96 catalog: None,
97 })
98 }
99
100 pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
103 let snapshot = table
104 .metadata()
105 .snapshot_by_id(snapshot_id)
106 .ok_or_else(|| {
107 Error::new(
108 ErrorKind::Unexpected,
109 format!(
110 "snapshot id {snapshot_id} not found in table {}",
111 table.identifier().name()
112 ),
113 )
114 })?;
115 let schema = snapshot.schema(table.metadata())?;
116 let schema = Arc::new(schema_to_arrow_schema(&schema)?);
117 Ok(IcebergTableProvider {
118 table,
119 snapshot_id: Some(snapshot_id),
120 schema,
121 catalog: None,
122 })
123 }
124
125 pub(crate) fn metadata_table(&self, r#type: MetadataTableType) -> IcebergMetadataTableProvider {
126 IcebergMetadataTableProvider {
127 table: self.table.clone(),
128 r#type,
129 }
130 }
131}
132
133#[async_trait]
134impl TableProvider for IcebergTableProvider {
135 fn as_any(&self) -> &dyn Any {
136 self
137 }
138
139 fn schema(&self) -> ArrowSchemaRef {
140 self.schema.clone()
141 }
142
143 fn table_type(&self) -> TableType {
144 TableType::Base
145 }
146
147 async fn scan(
148 &self,
149 _state: &dyn Session,
150 projection: Option<&Vec<usize>>,
151 filters: &[Expr],
152 _limit: Option<usize>,
153 ) -> DFResult<Arc<dyn ExecutionPlan>> {
154 Ok(Arc::new(IcebergTableScan::new(
155 self.table.clone(),
156 self.snapshot_id,
157 self.schema.clone(),
158 projection,
159 filters,
160 )))
161 }
162
163 fn supports_filters_pushdown(
164 &self,
165 filters: &[&Expr],
166 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
167 Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
169 }
170
171 async fn insert_into(
172 &self,
173 _state: &dyn Session,
174 input: Arc<dyn ExecutionPlan>,
175 _insert_op: InsertOp,
176 ) -> DFResult<Arc<dyn ExecutionPlan>> {
177 if !self
178 .table
179 .metadata()
180 .default_partition_spec()
181 .is_unpartitioned()
182 {
183 return Err(DataFusionError::NotImplemented(
185 "IcebergTableProvider::insert_into does not support partitioned tables yet"
186 .to_string(),
187 ));
188 }
189
190 let Some(catalog) = self.catalog.clone() else {
191 return Err(DataFusionError::Execution(
192 "Catalog cannot be none for insert_into".to_string(),
193 ));
194 };
195
196 let write_plan = Arc::new(IcebergWriteExec::new(
197 self.table.clone(),
198 input,
199 self.schema.clone(),
200 ));
201
202 let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
204
205 Ok(Arc::new(IcebergCommitExec::new(
206 self.table.clone(),
207 catalog,
208 coalesce_partitions,
209 self.schema.clone(),
210 )))
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use datafusion::common::Column;
217 use datafusion::prelude::SessionContext;
218 use iceberg::TableIdent;
219 use iceberg::io::FileIO;
220 use iceberg::table::{StaticTable, Table};
221
222 use super::*;
223
224 async fn get_test_table_from_metadata_file() -> Table {
225 let metadata_file_name = "TableMetadataV2Valid.json";
226 let metadata_file_path = format!(
227 "{}/tests/test_data/{}",
228 env!("CARGO_MANIFEST_DIR"),
229 metadata_file_name
230 );
231 let file_io = FileIO::from_path(&metadata_file_path)
232 .unwrap()
233 .build()
234 .unwrap();
235 let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
236 let static_table =
237 StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
238 .await
239 .unwrap();
240 static_table.into_table()
241 }
242
243 #[tokio::test]
244 async fn test_try_new_from_table() {
245 let table = get_test_table_from_metadata_file().await;
246 let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
247 .await
248 .unwrap();
249 let ctx = SessionContext::new();
250 ctx.register_table("mytable", Arc::new(table_provider))
251 .unwrap();
252 let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
253 let df_schema = df.schema();
254 let df_columns = df_schema.fields();
255 assert_eq!(df_columns.len(), 3);
256 let x_column = df_columns.first().unwrap();
257 let column_data = format!(
258 "{:?}:{:?}",
259 x_column.name(),
260 x_column.data_type().to_string()
261 );
262 assert_eq!(column_data, "\"x\":\"Int64\"");
263 let has_column = df_schema.has_column(&Column::from_name("z"));
264 assert!(has_column);
265 }
266
267 #[tokio::test]
268 async fn test_try_new_from_table_snapshot() {
269 let table = get_test_table_from_metadata_file().await;
270 let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
271 let table_provider =
272 IcebergTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
273 .await
274 .unwrap();
275 let ctx = SessionContext::new();
276 ctx.register_table("mytable", Arc::new(table_provider))
277 .unwrap();
278 let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
279 let df_schema = df.schema();
280 let df_columns = df_schema.fields();
281 assert_eq!(df_columns.len(), 3);
282 let x_column = df_columns.first().unwrap();
283 let column_data = format!(
284 "{:?}:{:?}",
285 x_column.name(),
286 x_column.data_type().to_string()
287 );
288 assert_eq!(column_data, "\"x\":\"Int64\"");
289 let has_column = df_schema.has_column(&Column::from_name("z"));
290 assert!(has_column);
291 }
292
293 #[tokio::test]
294 async fn test_physical_input_schema_consistent_with_logical_input_schema() {
295 let table = get_test_table_from_metadata_file().await;
296 let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
297 .await
298 .unwrap();
299 let ctx = SessionContext::new();
300 ctx.register_table("mytable", Arc::new(table_provider))
301 .unwrap();
302 let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
303 let physical_plan = df.create_physical_plan().await;
304 assert!(physical_plan.is_ok())
305 }
306}