iceberg_datafusion/table/
metadata_table.rs1use std::any::Any;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use datafusion::arrow::array::RecordBatch;
23use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
24use datafusion::catalog::Session;
25use datafusion::datasource::{TableProvider, TableType};
26use datafusion::error::Result as DFResult;
27use datafusion::logical_expr::Expr;
28use datafusion::physical_plan::ExecutionPlan;
29use futures::TryStreamExt;
30use futures::stream::BoxStream;
31use iceberg::arrow::schema_to_arrow_schema;
32use iceberg::inspect::MetadataTableType;
33use iceberg::table::Table;
34
35use crate::physical_plan::metadata_scan::IcebergMetadataScan;
36use crate::to_datafusion_error;
37
38#[derive(Debug, Clone)]
41pub struct IcebergMetadataTableProvider {
42 pub(crate) table: Table,
43 pub(crate) r#type: MetadataTableType,
44}
45
46#[async_trait]
47impl TableProvider for IcebergMetadataTableProvider {
48 fn as_any(&self) -> &dyn Any {
49 self
50 }
51
52 fn schema(&self) -> ArrowSchemaRef {
53 let metadata_table = self.table.inspect();
54 let schema = match self.r#type {
55 MetadataTableType::Snapshots => metadata_table.snapshots().schema(),
56 MetadataTableType::Manifests => metadata_table.manifests().schema(),
57 };
58 schema_to_arrow_schema(&schema).unwrap().into()
59 }
60
61 fn table_type(&self) -> TableType {
62 TableType::Base
63 }
64
65 async fn scan(
66 &self,
67 _state: &dyn Session,
68 _projection: Option<&Vec<usize>>,
69 _filters: &[Expr],
70 _limit: Option<usize>,
71 ) -> DFResult<Arc<dyn ExecutionPlan>> {
72 Ok(Arc::new(IcebergMetadataScan::new(self.clone())))
73 }
74}
75
76impl IcebergMetadataTableProvider {
77 pub async fn scan(self) -> DFResult<BoxStream<'static, DFResult<RecordBatch>>> {
78 let metadata_table = self.table.inspect();
79 let stream = match self.r#type {
80 MetadataTableType::Snapshots => metadata_table.snapshots().scan().await,
81 MetadataTableType::Manifests => metadata_table.manifests().scan().await,
82 }
83 .map_err(to_datafusion_error)?;
84 let stream = stream.map_err(to_datafusion_error);
85 Ok(Box::pin(stream))
86 }
87}