iceberg_datafusion/physical_plan/
scan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::vec;
22
23use datafusion::arrow::array::RecordBatch;
24use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
25use datafusion::error::Result as DFResult;
26use datafusion::execution::{SendableRecordBatchStream, TaskContext};
27use datafusion::physical_expr::EquivalenceProperties;
28use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
30use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
31use datafusion::prelude::Expr;
32use futures::{Stream, TryStreamExt};
33use iceberg::expr::Predicate;
34use iceberg::table::Table;
35
36use super::expr_to_predicate::convert_filters_to_predicate;
37use crate::to_datafusion_error;
38
39/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
40/// necessary details and computed properties required for execution planning.
41#[derive(Debug)]
42pub struct IcebergTableScan {
43    /// A table in the catalog.
44    table: Table,
45    /// Snapshot of the table to scan.
46    snapshot_id: Option<i64>,
47    /// Stores certain, often expensive to compute,
48    /// plan properties used in query optimization.
49    plan_properties: PlanProperties,
50    /// Projection column names, None means all columns
51    projection: Option<Vec<String>>,
52    /// Filters to apply to the table scan
53    predicates: Option<Predicate>,
54}
55
56impl IcebergTableScan {
57    /// Creates a new [`IcebergTableScan`] object.
58    pub(crate) fn new(
59        table: Table,
60        snapshot_id: Option<i64>,
61        schema: ArrowSchemaRef,
62        projection: Option<&Vec<usize>>,
63        filters: &[Expr],
64    ) -> Self {
65        let output_schema = match projection {
66            None => schema.clone(),
67            Some(projection) => Arc::new(schema.project(projection).unwrap()),
68        };
69        let plan_properties = Self::compute_properties(output_schema.clone());
70        let projection = get_column_names(schema.clone(), projection);
71        let predicates = convert_filters_to_predicate(filters);
72
73        Self {
74            table,
75            snapshot_id,
76            plan_properties,
77            projection,
78            predicates,
79        }
80    }
81
82    pub fn table(&self) -> &Table {
83        &self.table
84    }
85
86    pub fn snapshot_id(&self) -> Option<i64> {
87        self.snapshot_id
88    }
89
90    pub fn projection(&self) -> Option<&[String]> {
91        self.projection.as_deref()
92    }
93
94    pub fn predicates(&self) -> Option<&Predicate> {
95        self.predicates.as_ref()
96    }
97
98    /// Computes [`PlanProperties`] used in query optimization.
99    fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
100        // TODO:
101        // This is more or less a placeholder, to be replaced
102        // once we support output-partitioning
103        PlanProperties::new(
104            EquivalenceProperties::new(schema),
105            Partitioning::UnknownPartitioning(1),
106            EmissionType::Incremental,
107            Boundedness::Bounded,
108        )
109    }
110}
111
112impl ExecutionPlan for IcebergTableScan {
113    fn name(&self) -> &str {
114        "IcebergTableScan"
115    }
116
117    fn as_any(&self) -> &dyn Any {
118        self
119    }
120
121    fn children(&self) -> Vec<&Arc<(dyn ExecutionPlan + 'static)>> {
122        vec![]
123    }
124
125    fn with_new_children(
126        self: Arc<Self>,
127        _children: Vec<Arc<dyn ExecutionPlan>>,
128    ) -> DFResult<Arc<dyn ExecutionPlan>> {
129        Ok(self)
130    }
131
132    fn properties(&self) -> &PlanProperties {
133        &self.plan_properties
134    }
135
136    fn execute(
137        &self,
138        _partition: usize,
139        _context: Arc<TaskContext>,
140    ) -> DFResult<SendableRecordBatchStream> {
141        let fut = get_batch_stream(
142            self.table.clone(),
143            self.snapshot_id,
144            self.projection.clone(),
145            self.predicates.clone(),
146        );
147        let stream = futures::stream::once(fut).try_flatten();
148
149        Ok(Box::pin(RecordBatchStreamAdapter::new(
150            self.schema(),
151            stream,
152        )))
153    }
154}
155
156impl DisplayAs for IcebergTableScan {
157    fn fmt_as(
158        &self,
159        _t: datafusion::physical_plan::DisplayFormatType,
160        f: &mut std::fmt::Formatter,
161    ) -> std::fmt::Result {
162        write!(
163            f,
164            "IcebergTableScan projection:[{}] predicate:[{}]",
165            self.projection
166                .clone()
167                .map_or(String::new(), |v| v.join(",")),
168            self.predicates
169                .clone()
170                .map_or(String::from(""), |p| format!("{}", p))
171        )
172    }
173}
174
175/// Asynchronously retrieves a stream of [`RecordBatch`] instances
176/// from a given table.
177///
178/// This function initializes a [`TableScan`], builds it,
179/// and then converts it into a stream of Arrow [`RecordBatch`]es.
180async fn get_batch_stream(
181    table: Table,
182    snapshot_id: Option<i64>,
183    column_names: Option<Vec<String>>,
184    predicates: Option<Predicate>,
185) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
186    let scan_builder = match snapshot_id {
187        Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
188        None => table.scan(),
189    };
190
191    let mut scan_builder = match column_names {
192        Some(column_names) => scan_builder.select(column_names),
193        None => scan_builder.select_all(),
194    };
195    if let Some(pred) = predicates {
196        scan_builder = scan_builder.with_filter(pred);
197    }
198    let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
199
200    let stream = table_scan
201        .to_arrow()
202        .await
203        .map_err(to_datafusion_error)?
204        .map_err(to_datafusion_error);
205    Ok(Box::pin(stream))
206}
207
208fn get_column_names(
209    schema: ArrowSchemaRef,
210    projection: Option<&Vec<usize>>,
211) -> Option<Vec<String>> {
212    projection.map(|v| {
213        v.iter()
214            .map(|p| schema.field(*p).name().clone())
215            .collect::<Vec<String>>()
216    })
217}