Skip to main content

indexlake_datafusion/
scan.rs

1use std::any::Any;
2use std::ops::Range;
3use std::sync::Arc;
4
5use arrow::array::{RecordBatch, RecordBatchOptions};
6use arrow::datatypes::{Schema, SchemaRef};
7use datafusion_common::stats::Precision;
8use datafusion_common::{DFSchema, DataFusionError, Statistics, project_schema};
9use datafusion_execution::{SendableRecordBatchStream, TaskContext};
10use datafusion_expr::Expr;
11use datafusion_physical_expr::EquivalenceProperties;
12use datafusion_physical_plan::display::ProjectSchemaDisplay;
13use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
14use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
15use datafusion_physical_plan::{
16    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
17};
18use futures::{StreamExt, TryStreamExt};
19use indexlake::catalog::DataFileRecord;
20use indexlake::table::{Table, TableScan, TableScanPartition};
21use log::error;
22
23use crate::{LazyTable, datafusion_expr_to_indexlake_expr};
24
25#[derive(Debug)]
26pub struct IndexLakeScanExec {
27    pub lazy_table: LazyTable,
28    pub output_schema: SchemaRef,
29    pub partition_count: usize,
30    pub data_files: Option<Arc<Vec<DataFileRecord>>>,
31    pub projection: Option<Vec<usize>>,
32    pub filters: Vec<Expr>,
33    pub batch_size: usize,
34    pub limit: Option<usize>,
35    pub data_file_partition_ranges: Option<Vec<Option<Range<usize>>>>,
36    properties: PlanProperties,
37}
38
39impl IndexLakeScanExec {
40    #[allow(clippy::too_many_arguments)]
41    pub fn try_new(
42        lazy_table: LazyTable,
43        output_schema: SchemaRef,
44        partition_count: usize,
45        data_files: Option<Arc<Vec<DataFileRecord>>>,
46        projection: Option<Vec<usize>>,
47        filters: Vec<Expr>,
48        batch_size: usize,
49        limit: Option<usize>,
50    ) -> Result<Self, DataFusionError> {
51        let projected_schema = project_schema(&output_schema, projection.as_ref())?;
52        let properties = PlanProperties::new(
53            EquivalenceProperties::new(projected_schema),
54            Partitioning::UnknownPartitioning(partition_count),
55            EmissionType::Incremental,
56            Boundedness::Bounded,
57        );
58        let data_file_partition_ranges = data_files
59            .as_ref()
60            .map(|files| calc_data_file_partition_ranges(partition_count, files.len()));
61        Ok(Self {
62            lazy_table,
63            output_schema,
64            partition_count,
65            data_files,
66            projection,
67            filters,
68            batch_size,
69            limit,
70            data_file_partition_ranges,
71            properties,
72        })
73    }
74
75    pub fn get_scan_partition(&self, partition: Option<usize>) -> TableScanPartition {
76        match partition {
77            Some(partition) => {
78                if let Some(data_files) = self.data_files.as_ref()
79                    && let Some(data_file_partition_ranges) =
80                        self.data_file_partition_ranges.as_ref()
81                {
82                    let range = data_file_partition_ranges[partition].clone();
83                    TableScanPartition::Provided {
84                        contains_inline_rows: partition == 0,
85                        data_file_records: if let Some(range) = range {
86                            data_files[range].to_vec()
87                        } else {
88                            vec![]
89                        },
90                    }
91                } else {
92                    TableScanPartition::Auto {
93                        partition_idx: partition,
94                        partition_count: self.partition_count,
95                    }
96                }
97            }
98            None => TableScanPartition::single_partition(),
99        }
100    }
101}
102
103impl ExecutionPlan for IndexLakeScanExec {
104    fn name(&self) -> &str {
105        "IndexLakeScanExec"
106    }
107
108    fn as_any(&self) -> &dyn Any {
109        self
110    }
111
112    fn properties(&self) -> &PlanProperties {
113        &self.properties
114    }
115
116    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
117        vec![]
118    }
119
120    fn with_new_children(
121        self: Arc<Self>,
122        _children: Vec<Arc<dyn ExecutionPlan>>,
123    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
124        Ok(self)
125    }
126
127    fn execute(
128        &self,
129        partition: usize,
130        _context: Arc<TaskContext>,
131    ) -> Result<SendableRecordBatchStream, DataFusionError> {
132        if partition >= self.partition_count {
133            return Err(DataFusionError::Execution(format!(
134                "partition index out of range: {partition} >= {}",
135                self.partition_count
136            )));
137        }
138
139        let df_schema = DFSchema::try_from(self.output_schema.as_ref().clone())?;
140        let il_filters = self
141            .filters
142            .iter()
143            .map(|f| datafusion_expr_to_indexlake_expr(f, &df_schema))
144            .collect::<Result<Vec<_>, _>>()?;
145
146        let scan_partition = self.get_scan_partition(Some(partition));
147
148        let scan = TableScan::default()
149            .with_projection(self.projection.clone())
150            .with_filters(il_filters)
151            .with_batch_size(self.batch_size)
152            .with_partition(scan_partition)
153            .with_limit(self.limit);
154
155        let projected_schema = self.schema();
156        let lazy_table = self.lazy_table.clone();
157
158        let fut = async move {
159            let table = lazy_table
160                .get_or_load()
161                .await
162                .map_err(|e| DataFusionError::External(Box::new(e)))?;
163            get_batch_stream(table, projected_schema.clone(), scan).await
164        };
165        let stream = futures::stream::once(fut).try_flatten();
166        Ok(Box::pin(RecordBatchStreamAdapter::new(
167            self.schema(),
168            stream,
169        )))
170    }
171
172    fn partition_statistics(
173        &self,
174        partition: Option<usize>,
175    ) -> Result<Statistics, DataFusionError> {
176        let scan_partition = self.get_scan_partition(partition);
177        let lazy_table = self.lazy_table.clone();
178
179        let row_count_result = tokio::task::block_in_place(|| {
180            tokio::runtime::Handle::current().block_on(async {
181                let table = lazy_table.get_or_load().await?;
182                table.count(scan_partition).await
183            })
184        });
185        match row_count_result {
186            Ok(row_count) => {
187                if self.filters.is_empty() {
188                    if let Some(limit) = self.limit {
189                        Ok(Statistics {
190                            num_rows: Precision::Exact(std::cmp::min(row_count, limit)),
191                            total_byte_size: Precision::Absent,
192                            column_statistics: Statistics::unknown_column(&self.schema()),
193                        })
194                    } else {
195                        Ok(Statistics {
196                            num_rows: Precision::Exact(row_count),
197                            total_byte_size: Precision::Absent,
198                            column_statistics: Statistics::unknown_column(&self.schema()),
199                        })
200                    }
201                } else {
202                    Ok(Statistics {
203                        num_rows: Precision::Inexact(row_count),
204                        total_byte_size: Precision::Absent,
205                        column_statistics: Statistics::unknown_column(&self.schema()),
206                    })
207                }
208            }
209            Err(e) => Err(DataFusionError::Plan(format!(
210                "Error getting indexlake table {}.{} row count: {:?}",
211                self.lazy_table.namespace_name, self.lazy_table.table_name, e
212            ))),
213        }
214    }
215
216    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
217        match IndexLakeScanExec::try_new(
218            self.lazy_table.clone(),
219            self.output_schema.clone(),
220            self.partition_count,
221            self.data_files.clone(),
222            self.projection.clone(),
223            self.filters.clone(),
224            self.batch_size,
225            limit,
226        ) {
227            Ok(exec) => Some(Arc::new(exec)),
228            Err(e) => {
229                error!("[indexlake] Failed to create IndexLakeScanExec with fetch: {e}");
230                None
231            }
232        }
233    }
234
235    fn fetch(&self) -> Option<usize> {
236        self.limit
237    }
238}
239
240impl DisplayAs for IndexLakeScanExec {
241    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
242        write!(
243            f,
244            "IndexLakeScanExec: table={}.{}, partitions={}",
245            self.lazy_table.namespace_name, self.lazy_table.table_name, self.partition_count
246        )?;
247        let projected_schema = self.schema();
248        if !schema_projection_equals(&projected_schema, &self.output_schema) {
249            write!(
250                f,
251                ", projection={}",
252                ProjectSchemaDisplay(&projected_schema)
253            )?;
254        }
255        if !self.filters.is_empty() {
256            write!(
257                f,
258                ", filters=[{}]",
259                self.filters
260                    .iter()
261                    .map(|f| f.to_string())
262                    .collect::<Vec<_>>()
263                    .join(", ")
264            )?;
265        }
266        if let Some(limit) = self.limit {
267            write!(f, ", limit={limit}")?;
268        }
269        Ok(())
270    }
271}
272
273fn schema_projection_equals(left: &Schema, right: &Schema) -> bool {
274    if left.fields.len() != right.fields.len() {
275        return false;
276    }
277    for (left_field, right_field) in left.fields.iter().zip(right.fields.iter()) {
278        if left_field.name() != right_field.name() {
279            return false;
280        }
281    }
282    true
283}
284
285async fn get_batch_stream(
286    table: Arc<Table>,
287    projected_schema: SchemaRef,
288    mut scan: TableScan,
289) -> Result<SendableRecordBatchStream, DataFusionError> {
290    let stream = if scan.projection == Some(Vec::new()) {
291        scan.projection = Some(vec![0]);
292        let stream = table
293            .scan(scan)
294            .await
295            .map_err(|e| DataFusionError::Execution(e.to_string()))?;
296        stream
297            .map(|batch| {
298                let batch = batch?;
299                let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
300                let new_batch =
301                    RecordBatch::try_new_with_options(Arc::new(Schema::empty()), vec![], &options)?;
302                Ok(new_batch)
303            })
304            .boxed()
305    } else {
306        table
307            .scan(scan)
308            .await
309            .map_err(|e| DataFusionError::Execution(e.to_string()))?
310    };
311    let stream = stream.map_err(|e| DataFusionError::Execution(e.to_string()));
312    Ok(Box::pin(RecordBatchStreamAdapter::new(
313        projected_schema,
314        stream,
315    )))
316}
317
318fn calc_data_file_partition_ranges(
319    partition_count: usize,
320    data_file_count: usize,
321) -> Vec<Option<Range<usize>>> {
322    let mut partition_allocations = vec![0; partition_count];
323
324    if partition_count > data_file_count {
325        for partition_allocation in partition_allocations.iter_mut().take(data_file_count) {
326            *partition_allocation = 1;
327        }
328    } else {
329        let partition_size = data_file_count / partition_count;
330        for partition_allocation in partition_allocations.iter_mut() {
331            *partition_allocation = partition_size;
332        }
333
334        let left = data_file_count - partition_count * partition_size;
335        for partition_allocation in partition_allocations.iter_mut().take(left) {
336            *partition_allocation += 1;
337        }
338    }
339
340    let mut ranges = Vec::with_capacity(partition_count);
341    let mut start = 0usize;
342    for partition_allocation in partition_allocations.iter() {
343        if *partition_allocation == 0 {
344            ranges.push(None);
345        } else {
346            let partition_range = start..start + *partition_allocation;
347            ranges.push(Some(partition_range));
348            start += *partition_allocation;
349        }
350    }
351
352    ranges
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_partition_data_file_range() {
361        let ranges = calc_data_file_partition_ranges(2, 0);
362        assert_eq!(ranges, vec![None, None]);
363
364        let ranges = calc_data_file_partition_ranges(2, 1);
365        assert_eq!(ranges, vec![Some(0..1), None]);
366
367        let ranges = calc_data_file_partition_ranges(2, 2);
368        assert_eq!(ranges, vec![Some(0..1), Some(1..2)]);
369
370        let ranges = calc_data_file_partition_ranges(2, 3);
371        assert_eq!(ranges, vec![Some(0..2), Some(2..3)]);
372
373        let ranges = calc_data_file_partition_ranges(2, 4);
374        assert_eq!(ranges, vec![Some(0..2), Some(2..4)]);
375    }
376}