iceberg_datafusion/table/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub mod metadata_table;
19pub mod table_provider_factory;
20
21use std::any::Any;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
26use datafusion::catalog::Session;
27use datafusion::common::DataFusionError;
28use datafusion::datasource::{TableProvider, TableType};
29use datafusion::error::Result as DFResult;
30use datafusion::logical_expr::dml::InsertOp;
31use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
32use datafusion::physical_plan::ExecutionPlan;
33use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
34use iceberg::arrow::schema_to_arrow_schema;
35use iceberg::inspect::MetadataTableType;
36use iceberg::table::Table;
37use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
38use metadata_table::IcebergMetadataTableProvider;
39
40use crate::physical_plan::commit::IcebergCommitExec;
41use crate::physical_plan::scan::IcebergTableScan;
42use crate::physical_plan::write::IcebergWriteExec;
43
44/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
45/// managing access to a [`Table`].
46#[derive(Debug, Clone)]
47pub struct IcebergTableProvider {
48    /// A table in the catalog.
49    table: Table,
50    /// Table snapshot id that will be queried via this provider.
51    snapshot_id: Option<i64>,
52    /// A reference-counted arrow `Schema`.
53    schema: ArrowSchemaRef,
54    /// The catalog that the table belongs to.
55    catalog: Option<Arc<dyn Catalog>>,
56}
57
58impl IcebergTableProvider {
59    pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
60        IcebergTableProvider {
61            table,
62            snapshot_id: None,
63            schema,
64            catalog: None,
65        }
66    }
67    /// Asynchronously tries to construct a new [`IcebergTableProvider`]
68    /// using the given client and table name to fetch an actual [`Table`]
69    /// in the provided namespace.
70    pub(crate) async fn try_new(
71        client: Arc<dyn Catalog>,
72        namespace: NamespaceIdent,
73        name: impl Into<String>,
74    ) -> Result<Self> {
75        let ident = TableIdent::new(namespace, name.into());
76        let table = client.load_table(&ident).await?;
77
78        let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
79
80        Ok(IcebergTableProvider {
81            table,
82            snapshot_id: None,
83            schema,
84            catalog: Some(client),
85        })
86    }
87
88    /// Asynchronously tries to construct a new [`IcebergTableProvider`]
89    /// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
90    pub async fn try_new_from_table(table: Table) -> Result<Self> {
91        let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
92        Ok(IcebergTableProvider {
93            table,
94            snapshot_id: None,
95            schema,
96            catalog: None,
97        })
98    }
99
100    /// Asynchronously tries to construct a new [`IcebergTableProvider`]
101    /// using a specific snapshot of the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
102    pub async fn try_new_from_table_snapshot(table: Table, snapshot_id: i64) -> Result<Self> {
103        let snapshot = table
104            .metadata()
105            .snapshot_by_id(snapshot_id)
106            .ok_or_else(|| {
107                Error::new(
108                    ErrorKind::Unexpected,
109                    format!(
110                        "snapshot id {snapshot_id} not found in table {}",
111                        table.identifier().name()
112                    ),
113                )
114            })?;
115        let schema = snapshot.schema(table.metadata())?;
116        let schema = Arc::new(schema_to_arrow_schema(&schema)?);
117        Ok(IcebergTableProvider {
118            table,
119            snapshot_id: Some(snapshot_id),
120            schema,
121            catalog: None,
122        })
123    }
124
125    pub(crate) fn metadata_table(&self, r#type: MetadataTableType) -> IcebergMetadataTableProvider {
126        IcebergMetadataTableProvider {
127            table: self.table.clone(),
128            r#type,
129        }
130    }
131}
132
133#[async_trait]
134impl TableProvider for IcebergTableProvider {
135    fn as_any(&self) -> &dyn Any {
136        self
137    }
138
139    fn schema(&self) -> ArrowSchemaRef {
140        self.schema.clone()
141    }
142
143    fn table_type(&self) -> TableType {
144        TableType::Base
145    }
146
147    async fn scan(
148        &self,
149        _state: &dyn Session,
150        projection: Option<&Vec<usize>>,
151        filters: &[Expr],
152        _limit: Option<usize>,
153    ) -> DFResult<Arc<dyn ExecutionPlan>> {
154        Ok(Arc::new(IcebergTableScan::new(
155            self.table.clone(),
156            self.snapshot_id,
157            self.schema.clone(),
158            projection,
159            filters,
160        )))
161    }
162
163    fn supports_filters_pushdown(
164        &self,
165        filters: &[&Expr],
166    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
167        // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
168        Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
169    }
170
171    async fn insert_into(
172        &self,
173        _state: &dyn Session,
174        input: Arc<dyn ExecutionPlan>,
175        _insert_op: InsertOp,
176    ) -> DFResult<Arc<dyn ExecutionPlan>> {
177        if !self
178            .table
179            .metadata()
180            .default_partition_spec()
181            .is_unpartitioned()
182        {
183            // TODO add insert into support for partitioned tables
184            return Err(DataFusionError::NotImplemented(
185                "IcebergTableProvider::insert_into does not support partitioned tables yet"
186                    .to_string(),
187            ));
188        }
189
190        let Some(catalog) = self.catalog.clone() else {
191            return Err(DataFusionError::Execution(
192                "Catalog cannot be none for insert_into".to_string(),
193            ));
194        };
195
196        let write_plan = Arc::new(IcebergWriteExec::new(
197            self.table.clone(),
198            input,
199            self.schema.clone(),
200        ));
201
202        // Merge the outputs of write_plan into one so we can commit all files together
203        let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
204
205        Ok(Arc::new(IcebergCommitExec::new(
206            self.table.clone(),
207            catalog,
208            coalesce_partitions,
209            self.schema.clone(),
210        )))
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use datafusion::common::Column;
217    use datafusion::prelude::SessionContext;
218    use iceberg::TableIdent;
219    use iceberg::io::FileIO;
220    use iceberg::table::{StaticTable, Table};
221
222    use super::*;
223
224    async fn get_test_table_from_metadata_file() -> Table {
225        let metadata_file_name = "TableMetadataV2Valid.json";
226        let metadata_file_path = format!(
227            "{}/tests/test_data/{}",
228            env!("CARGO_MANIFEST_DIR"),
229            metadata_file_name
230        );
231        let file_io = FileIO::from_path(&metadata_file_path)
232            .unwrap()
233            .build()
234            .unwrap();
235        let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
236        let static_table =
237            StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
238                .await
239                .unwrap();
240        static_table.into_table()
241    }
242
243    #[tokio::test]
244    async fn test_try_new_from_table() {
245        let table = get_test_table_from_metadata_file().await;
246        let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
247            .await
248            .unwrap();
249        let ctx = SessionContext::new();
250        ctx.register_table("mytable", Arc::new(table_provider))
251            .unwrap();
252        let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
253        let df_schema = df.schema();
254        let df_columns = df_schema.fields();
255        assert_eq!(df_columns.len(), 3);
256        let x_column = df_columns.first().unwrap();
257        let column_data = format!(
258            "{:?}:{:?}",
259            x_column.name(),
260            x_column.data_type().to_string()
261        );
262        assert_eq!(column_data, "\"x\":\"Int64\"");
263        let has_column = df_schema.has_column(&Column::from_name("z"));
264        assert!(has_column);
265    }
266
267    #[tokio::test]
268    async fn test_try_new_from_table_snapshot() {
269        let table = get_test_table_from_metadata_file().await;
270        let snapshot_id = table.metadata().snapshots().next().unwrap().snapshot_id();
271        let table_provider =
272            IcebergTableProvider::try_new_from_table_snapshot(table.clone(), snapshot_id)
273                .await
274                .unwrap();
275        let ctx = SessionContext::new();
276        ctx.register_table("mytable", Arc::new(table_provider))
277            .unwrap();
278        let df = ctx.sql("SELECT * FROM mytable").await.unwrap();
279        let df_schema = df.schema();
280        let df_columns = df_schema.fields();
281        assert_eq!(df_columns.len(), 3);
282        let x_column = df_columns.first().unwrap();
283        let column_data = format!(
284            "{:?}:{:?}",
285            x_column.name(),
286            x_column.data_type().to_string()
287        );
288        assert_eq!(column_data, "\"x\":\"Int64\"");
289        let has_column = df_schema.has_column(&Column::from_name("z"));
290        assert!(has_column);
291    }
292
293    #[tokio::test]
294    async fn test_physical_input_schema_consistent_with_logical_input_schema() {
295        let table = get_test_table_from_metadata_file().await;
296        let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
297            .await
298            .unwrap();
299        let ctx = SessionContext::new();
300        ctx.register_table("mytable", Arc::new(table_provider))
301            .unwrap();
302        let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
303        let physical_plan = df.create_physical_plan().await;
304        assert!(physical_plan.is_ok())
305    }
306}