Skip to main content

paimon_datafusion/physical_plan/
scan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
22use datafusion::common::stats::Precision;
23use datafusion::common::Statistics;
24use datafusion::error::Result as DFResult;
25use datafusion::execution::{SendableRecordBatchStream, TaskContext};
26use datafusion::physical_expr::EquivalenceProperties;
27use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
28use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
29use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
30use futures::{StreamExt, TryStreamExt};
31use paimon::spec::Predicate;
32use paimon::table::Table;
33use paimon::DataSplit;
34
35use crate::error::to_datafusion_error;
36
37/// Execution plan that scans a Paimon table with optional column projection.
38///
39/// Planning is performed eagerly in [`super::super::table::PaimonTableProvider::scan`],
40/// and the resulting splits are distributed across DataFusion execution partitions
41/// so that DataFusion can schedule them in parallel.
42#[derive(Debug)]
43pub struct PaimonTableScan {
44    table: Table,
45    /// Projected column names (if None, reads all columns).
46    projected_columns: Option<Vec<String>>,
47    /// Filter translated from DataFusion expressions and reused during execute()
48    /// so reader-side pruning reaches the actual read path.
49    pushed_predicate: Option<Predicate>,
50    /// Pre-planned partition assignments: `planned_partitions[i]` contains the
51    /// Paimon splits that DataFusion partition `i` will read.
52    /// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`.
53    planned_partitions: Vec<Arc<[DataSplit]>>,
54    plan_properties: Arc<PlanProperties>,
55    /// Optional limit on the number of rows to return.
56    limit: Option<usize>,
57}
58
59impl PaimonTableScan {
60    pub(crate) fn new(
61        schema: ArrowSchemaRef,
62        table: Table,
63        projected_columns: Option<Vec<String>>,
64        pushed_predicate: Option<Predicate>,
65        planned_partitions: Vec<Arc<[DataSplit]>>,
66        limit: Option<usize>,
67    ) -> Self {
68        let plan_properties = Arc::new(PlanProperties::new(
69            EquivalenceProperties::new(schema.clone()),
70            Partitioning::UnknownPartitioning(planned_partitions.len()),
71            EmissionType::Incremental,
72            Boundedness::Bounded,
73        ));
74        Self {
75            table,
76            projected_columns,
77            pushed_predicate,
78            planned_partitions,
79            plan_properties,
80            limit,
81        }
82    }
83
84    pub fn table(&self) -> &Table {
85        &self.table
86    }
87
88    #[cfg(test)]
89    pub(crate) fn planned_partitions(&self) -> &[Arc<[DataSplit]>] {
90        &self.planned_partitions
91    }
92
93    #[cfg(test)]
94    pub(crate) fn pushed_predicate(&self) -> Option<&Predicate> {
95        self.pushed_predicate.as_ref()
96    }
97
98    pub fn limit(&self) -> Option<usize> {
99        self.limit
100    }
101}
102
103impl ExecutionPlan for PaimonTableScan {
104    fn name(&self) -> &str {
105        "PaimonTableScan"
106    }
107
108    fn as_any(&self) -> &dyn Any {
109        self
110    }
111
112    fn properties(&self) -> &Arc<PlanProperties> {
113        &self.plan_properties
114    }
115
116    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan + 'static>> {
117        vec![]
118    }
119
120    fn with_new_children(
121        self: Arc<Self>,
122        _children: Vec<Arc<dyn ExecutionPlan>>,
123    ) -> DFResult<Arc<dyn ExecutionPlan>> {
124        Ok(self)
125    }
126
127    fn execute(
128        &self,
129        partition: usize,
130        _context: Arc<TaskContext>,
131    ) -> DFResult<SendableRecordBatchStream> {
132        let splits = Arc::clone(self.planned_partitions.get(partition).ok_or_else(|| {
133            datafusion::error::DataFusionError::Internal(format!(
134                "PaimonTableScan: partition index {partition} out of range (total {})",
135                self.planned_partitions.len()
136            ))
137        })?);
138
139        let table = self.table.clone();
140        let schema = self.schema();
141        let projected_columns = self.projected_columns.clone();
142        let pushed_predicate = self.pushed_predicate.clone();
143
144        let fut = async move {
145            let mut read_builder = table.new_read_builder();
146
147            if let Some(ref columns) = projected_columns {
148                let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
149                read_builder.with_projection(&col_refs);
150            }
151            if let Some(filter) = pushed_predicate {
152                read_builder.with_filter(filter);
153            }
154
155            let read = read_builder.new_read().map_err(to_datafusion_error)?;
156            let stream = read.to_arrow(&splits).map_err(to_datafusion_error)?;
157            let stream = stream.map(|r| r.map_err(to_datafusion_error));
158
159            Ok::<_, datafusion::error::DataFusionError>(RecordBatchStreamAdapter::new(
160                schema,
161                Box::pin(stream),
162            ))
163        };
164
165        Ok(Box::pin(RecordBatchStreamAdapter::new(
166            self.schema(),
167            futures::stream::once(fut).try_flatten(),
168        )))
169    }
170
171    fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
172        let partitions: &[Arc<[DataSplit]>] = match partition {
173            Some(idx) => std::slice::from_ref(&self.planned_partitions[idx]),
174            None => &self.planned_partitions,
175        };
176
177        let mut total_rows: usize = 0;
178        let mut total_bytes: usize = 0;
179        for splits in partitions {
180            for split in splits.iter() {
181                total_rows += split.merged_row_count().unwrap_or(split.row_count()) as usize;
182                for file in split.data_files() {
183                    total_bytes += file.file_size as usize;
184                }
185            }
186        }
187
188        Ok(Statistics {
189            num_rows: Precision::Inexact(total_rows),
190            total_byte_size: Precision::Inexact(total_bytes),
191            column_statistics: Statistics::unknown_column(&self.schema()),
192        })
193    }
194}
195
196impl DisplayAs for PaimonTableScan {
197    fn fmt_as(
198        &self,
199        _t: datafusion::physical_plan::DisplayFormatType,
200        f: &mut std::fmt::Formatter,
201    ) -> std::fmt::Result {
202        write!(f, "PaimonTableScan: table={}", self.table.identifier())?;
203
204        let total_splits: usize = self.planned_partitions.iter().map(|p| p.len()).sum();
205        let total_files: usize = self
206            .planned_partitions
207            .iter()
208            .flat_map(|p| p.iter())
209            .map(|s| s.data_files().len())
210            .sum();
211        write!(
212            f,
213            ", partitions={}, splits={total_splits}, files={total_files}",
214            self.planned_partitions.len()
215        )?;
216
217        if let Some(ref columns) = self.projected_columns {
218            write!(f, ", projection=[{}]", columns.join(", "))?;
219        }
220        if let Some(ref predicate) = self.pushed_predicate {
221            write!(f, ", predicate={predicate}")?;
222        }
223        if let Some(limit) = self.limit {
224            write!(f, ", limit={limit}")?;
225        }
226        Ok(())
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    mod test_utils {
234        include!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../test_utils.rs"));
235    }
236
237    use datafusion::arrow::array::Int32Array;
238    use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
239    use datafusion::physical_plan::ExecutionPlan;
240    use datafusion::prelude::SessionContext;
241    use futures::TryStreamExt;
242    use paimon::catalog::Identifier;
243    use paimon::io::FileIOBuilder;
244    use paimon::spec::{
245        BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as PaimonSchema, TableSchema,
246    };
247    use paimon::table::Table;
248    use std::fs;
249    use tempfile::tempdir;
250    use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
251
252    fn test_schema() -> ArrowSchemaRef {
253        Arc::new(ArrowSchema::new(vec![Field::new(
254            "id",
255            ArrowDataType::Int32,
256            false,
257        )]))
258    }
259
260    #[test]
261    fn test_partition_count_empty_plan() {
262        let schema = test_schema();
263        let scan = PaimonTableScan::new(
264            schema,
265            dummy_table(),
266            None,
267            None,
268            vec![Arc::from(Vec::new())],
269            None,
270        );
271        assert_eq!(scan.properties().output_partitioning().partition_count(), 1);
272    }
273
274    #[test]
275    fn test_partition_count_multiple_partitions() {
276        let schema = test_schema();
277        let planned_partitions = vec![
278            Arc::from(Vec::new()),
279            Arc::from(Vec::new()),
280            Arc::from(Vec::new()),
281        ];
282        let scan =
283            PaimonTableScan::new(schema, dummy_table(), None, None, planned_partitions, None);
284        assert_eq!(scan.properties().output_partitioning().partition_count(), 3);
285    }
286
287    /// Constructs a minimal Table for testing (no real files needed since we
288    /// only test PlanProperties, not actual reads).
289    fn dummy_table() -> Table {
290        let file_io = FileIOBuilder::new("file").build().unwrap();
291        let schema = PaimonSchema::builder().build().unwrap();
292        let table_schema = TableSchema::new(0, &schema);
293        Table::new(
294            file_io,
295            Identifier::new("test_db", "test_table"),
296            "/tmp/test-table".to_string(),
297            table_schema,
298            None,
299        )
300    }
301
302    #[tokio::test]
303    async fn test_execute_applies_pushed_filter_during_read() {
304        let tempdir = tempdir().unwrap();
305        let table_path = local_file_path(tempdir.path());
306        let bucket_dir = tempdir.path().join("bucket-0");
307        fs::create_dir_all(&bucket_dir).unwrap();
308
309        write_int_parquet_file(
310            &bucket_dir.join("data.parquet"),
311            vec![("id", vec![1, 2, 3, 4]), ("value", vec![5, 20, 30, 40])],
312            Some(2),
313        );
314        let file_size = fs::metadata(bucket_dir.join("data.parquet")).unwrap().len() as i64;
315
316        let file_io = FileIOBuilder::new("file").build().unwrap();
317        let table_schema = TableSchema::new(
318            0,
319            &paimon::spec::Schema::builder()
320                .column("id", DataType::Int(IntType::new()))
321                .column("value", DataType::Int(IntType::new()))
322                .build()
323                .unwrap(),
324        );
325        let table = Table::new(
326            file_io,
327            Identifier::new("default", "t"),
328            table_path,
329            table_schema,
330            None,
331        );
332
333        let split = paimon::DataSplitBuilder::new()
334            .with_snapshot(1)
335            .with_partition(BinaryRow::new(0))
336            .with_bucket(0)
337            .with_bucket_path(local_file_path(&bucket_dir))
338            .with_total_buckets(1)
339            .with_data_files(vec![test_data_file("data.parquet", 4, file_size)])
340            .with_raw_convertible(true)
341            .build()
342            .unwrap();
343
344        let pushed_predicate = PredicateBuilder::new(table.schema().fields())
345            .greater_or_equal("value", Datum::Int(10))
346            .unwrap();
347
348        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
349            "id",
350            ArrowDataType::Int32,
351            false,
352        )]));
353        let scan = PaimonTableScan::new(
354            schema,
355            table,
356            Some(vec!["id".to_string()]),
357            Some(pushed_predicate),
358            vec![Arc::from(vec![split])],
359            None,
360        );
361
362        let ctx = SessionContext::new();
363        let stream = scan
364            .execute(0, ctx.task_ctx())
365            .expect("execute should succeed");
366        let batches = stream.try_collect::<Vec<_>>().await.unwrap();
367
368        let actual_ids: Vec<i32> = batches
369            .iter()
370            .flat_map(|batch| {
371                let ids = batch
372                    .column(0)
373                    .as_any()
374                    .downcast_ref::<Int32Array>()
375                    .expect("id column should be Int32Array");
376                (0..ids.len()).map(|idx| ids.value(idx)).collect::<Vec<_>>()
377            })
378            .collect();
379
380        assert_eq!(actual_ids, vec![2, 3, 4]);
381    }
382}