paimon_datafusion/physical_plan/
scan.rs1use std::any::Any;
19use std::sync::Arc;
20
21use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
22use datafusion::common::stats::Precision;
23use datafusion::common::Statistics;
24use datafusion::error::Result as DFResult;
25use datafusion::execution::{SendableRecordBatchStream, TaskContext};
26use datafusion::physical_expr::EquivalenceProperties;
27use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
28use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
29use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
30use futures::{StreamExt, TryStreamExt};
31use paimon::spec::Predicate;
32use paimon::table::Table;
33use paimon::DataSplit;
34
35use crate::error::to_datafusion_error;
36
37#[derive(Debug)]
43pub struct PaimonTableScan {
44 table: Table,
45 projected_columns: Option<Vec<String>>,
47 pushed_predicate: Option<Predicate>,
50 planned_partitions: Vec<Arc<[DataSplit]>>,
54 plan_properties: Arc<PlanProperties>,
55 limit: Option<usize>,
57}
58
59impl PaimonTableScan {
60 pub(crate) fn new(
61 schema: ArrowSchemaRef,
62 table: Table,
63 projected_columns: Option<Vec<String>>,
64 pushed_predicate: Option<Predicate>,
65 planned_partitions: Vec<Arc<[DataSplit]>>,
66 limit: Option<usize>,
67 ) -> Self {
68 let plan_properties = Arc::new(PlanProperties::new(
69 EquivalenceProperties::new(schema.clone()),
70 Partitioning::UnknownPartitioning(planned_partitions.len()),
71 EmissionType::Incremental,
72 Boundedness::Bounded,
73 ));
74 Self {
75 table,
76 projected_columns,
77 pushed_predicate,
78 planned_partitions,
79 plan_properties,
80 limit,
81 }
82 }
83
84 pub fn table(&self) -> &Table {
85 &self.table
86 }
87
88 #[cfg(test)]
89 pub(crate) fn planned_partitions(&self) -> &[Arc<[DataSplit]>] {
90 &self.planned_partitions
91 }
92
93 #[cfg(test)]
94 pub(crate) fn pushed_predicate(&self) -> Option<&Predicate> {
95 self.pushed_predicate.as_ref()
96 }
97
98 pub fn limit(&self) -> Option<usize> {
99 self.limit
100 }
101}
102
103impl ExecutionPlan for PaimonTableScan {
104 fn name(&self) -> &str {
105 "PaimonTableScan"
106 }
107
108 fn as_any(&self) -> &dyn Any {
109 self
110 }
111
112 fn properties(&self) -> &Arc<PlanProperties> {
113 &self.plan_properties
114 }
115
116 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan + 'static>> {
117 vec![]
118 }
119
120 fn with_new_children(
121 self: Arc<Self>,
122 _children: Vec<Arc<dyn ExecutionPlan>>,
123 ) -> DFResult<Arc<dyn ExecutionPlan>> {
124 Ok(self)
125 }
126
127 fn execute(
128 &self,
129 partition: usize,
130 _context: Arc<TaskContext>,
131 ) -> DFResult<SendableRecordBatchStream> {
132 let splits = Arc::clone(self.planned_partitions.get(partition).ok_or_else(|| {
133 datafusion::error::DataFusionError::Internal(format!(
134 "PaimonTableScan: partition index {partition} out of range (total {})",
135 self.planned_partitions.len()
136 ))
137 })?);
138
139 let table = self.table.clone();
140 let schema = self.schema();
141 let projected_columns = self.projected_columns.clone();
142 let pushed_predicate = self.pushed_predicate.clone();
143
144 let fut = async move {
145 let mut read_builder = table.new_read_builder();
146
147 if let Some(ref columns) = projected_columns {
148 let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
149 read_builder.with_projection(&col_refs);
150 }
151 if let Some(filter) = pushed_predicate {
152 read_builder.with_filter(filter);
153 }
154
155 let read = read_builder.new_read().map_err(to_datafusion_error)?;
156 let stream = read.to_arrow(&splits).map_err(to_datafusion_error)?;
157 let stream = stream.map(|r| r.map_err(to_datafusion_error));
158
159 Ok::<_, datafusion::error::DataFusionError>(RecordBatchStreamAdapter::new(
160 schema,
161 Box::pin(stream),
162 ))
163 };
164
165 Ok(Box::pin(RecordBatchStreamAdapter::new(
166 self.schema(),
167 futures::stream::once(fut).try_flatten(),
168 )))
169 }
170
171 fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
172 let partitions: &[Arc<[DataSplit]>] = match partition {
173 Some(idx) => std::slice::from_ref(&self.planned_partitions[idx]),
174 None => &self.planned_partitions,
175 };
176
177 let mut total_rows: usize = 0;
178 let mut total_bytes: usize = 0;
179 for splits in partitions {
180 for split in splits.iter() {
181 total_rows += split.merged_row_count().unwrap_or(split.row_count()) as usize;
182 for file in split.data_files() {
183 total_bytes += file.file_size as usize;
184 }
185 }
186 }
187
188 Ok(Statistics {
189 num_rows: Precision::Inexact(total_rows),
190 total_byte_size: Precision::Inexact(total_bytes),
191 column_statistics: Statistics::unknown_column(&self.schema()),
192 })
193 }
194}
195
196impl DisplayAs for PaimonTableScan {
197 fn fmt_as(
198 &self,
199 _t: datafusion::physical_plan::DisplayFormatType,
200 f: &mut std::fmt::Formatter,
201 ) -> std::fmt::Result {
202 write!(f, "PaimonTableScan: table={}", self.table.identifier())?;
203
204 let total_splits: usize = self.planned_partitions.iter().map(|p| p.len()).sum();
205 let total_files: usize = self
206 .planned_partitions
207 .iter()
208 .flat_map(|p| p.iter())
209 .map(|s| s.data_files().len())
210 .sum();
211 write!(
212 f,
213 ", partitions={}, splits={total_splits}, files={total_files}",
214 self.planned_partitions.len()
215 )?;
216
217 if let Some(ref columns) = self.projected_columns {
218 write!(f, ", projection=[{}]", columns.join(", "))?;
219 }
220 if let Some(ref predicate) = self.pushed_predicate {
221 write!(f, ", predicate={predicate}")?;
222 }
223 if let Some(limit) = self.limit {
224 write!(f, ", limit={limit}")?;
225 }
226 Ok(())
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 mod test_utils {
234 include!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../test_utils.rs"));
235 }
236
237 use datafusion::arrow::array::Int32Array;
238 use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
239 use datafusion::physical_plan::ExecutionPlan;
240 use datafusion::prelude::SessionContext;
241 use futures::TryStreamExt;
242 use paimon::catalog::Identifier;
243 use paimon::io::FileIOBuilder;
244 use paimon::spec::{
245 BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as PaimonSchema, TableSchema,
246 };
247 use paimon::table::Table;
248 use std::fs;
249 use tempfile::tempdir;
250 use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
251
252 fn test_schema() -> ArrowSchemaRef {
253 Arc::new(ArrowSchema::new(vec![Field::new(
254 "id",
255 ArrowDataType::Int32,
256 false,
257 )]))
258 }
259
260 #[test]
261 fn test_partition_count_empty_plan() {
262 let schema = test_schema();
263 let scan = PaimonTableScan::new(
264 schema,
265 dummy_table(),
266 None,
267 None,
268 vec![Arc::from(Vec::new())],
269 None,
270 );
271 assert_eq!(scan.properties().output_partitioning().partition_count(), 1);
272 }
273
274 #[test]
275 fn test_partition_count_multiple_partitions() {
276 let schema = test_schema();
277 let planned_partitions = vec![
278 Arc::from(Vec::new()),
279 Arc::from(Vec::new()),
280 Arc::from(Vec::new()),
281 ];
282 let scan =
283 PaimonTableScan::new(schema, dummy_table(), None, None, planned_partitions, None);
284 assert_eq!(scan.properties().output_partitioning().partition_count(), 3);
285 }
286
287 fn dummy_table() -> Table {
290 let file_io = FileIOBuilder::new("file").build().unwrap();
291 let schema = PaimonSchema::builder().build().unwrap();
292 let table_schema = TableSchema::new(0, &schema);
293 Table::new(
294 file_io,
295 Identifier::new("test_db", "test_table"),
296 "/tmp/test-table".to_string(),
297 table_schema,
298 None,
299 )
300 }
301
302 #[tokio::test]
303 async fn test_execute_applies_pushed_filter_during_read() {
304 let tempdir = tempdir().unwrap();
305 let table_path = local_file_path(tempdir.path());
306 let bucket_dir = tempdir.path().join("bucket-0");
307 fs::create_dir_all(&bucket_dir).unwrap();
308
309 write_int_parquet_file(
310 &bucket_dir.join("data.parquet"),
311 vec![("id", vec![1, 2, 3, 4]), ("value", vec![5, 20, 30, 40])],
312 Some(2),
313 );
314 let file_size = fs::metadata(bucket_dir.join("data.parquet")).unwrap().len() as i64;
315
316 let file_io = FileIOBuilder::new("file").build().unwrap();
317 let table_schema = TableSchema::new(
318 0,
319 &paimon::spec::Schema::builder()
320 .column("id", DataType::Int(IntType::new()))
321 .column("value", DataType::Int(IntType::new()))
322 .build()
323 .unwrap(),
324 );
325 let table = Table::new(
326 file_io,
327 Identifier::new("default", "t"),
328 table_path,
329 table_schema,
330 None,
331 );
332
333 let split = paimon::DataSplitBuilder::new()
334 .with_snapshot(1)
335 .with_partition(BinaryRow::new(0))
336 .with_bucket(0)
337 .with_bucket_path(local_file_path(&bucket_dir))
338 .with_total_buckets(1)
339 .with_data_files(vec![test_data_file("data.parquet", 4, file_size)])
340 .with_raw_convertible(true)
341 .build()
342 .unwrap();
343
344 let pushed_predicate = PredicateBuilder::new(table.schema().fields())
345 .greater_or_equal("value", Datum::Int(10))
346 .unwrap();
347
348 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
349 "id",
350 ArrowDataType::Int32,
351 false,
352 )]));
353 let scan = PaimonTableScan::new(
354 schema,
355 table,
356 Some(vec!["id".to_string()]),
357 Some(pushed_predicate),
358 vec![Arc::from(vec![split])],
359 None,
360 );
361
362 let ctx = SessionContext::new();
363 let stream = scan
364 .execute(0, ctx.task_ctx())
365 .expect("execute should succeed");
366 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
367
368 let actual_ids: Vec<i32> = batches
369 .iter()
370 .flat_map(|batch| {
371 let ids = batch
372 .column(0)
373 .as_any()
374 .downcast_ref::<Int32Array>()
375 .expect("id column should be Int32Array");
376 (0..ids.len()).map(|idx| ids.value(idx)).collect::<Vec<_>>()
377 })
378 .collect();
379
380 assert_eq!(actual_ids, vec![2, 3, 4]);
381 }
382}