1use std::any::Any;
2use std::ops::Range;
3use std::sync::Arc;
4
5use arrow::array::{RecordBatch, RecordBatchOptions};
6use arrow::datatypes::{Schema, SchemaRef};
7use datafusion_common::stats::Precision;
8use datafusion_common::{DFSchema, DataFusionError, Statistics, project_schema};
9use datafusion_execution::{SendableRecordBatchStream, TaskContext};
10use datafusion_expr::Expr;
11use datafusion_physical_expr::EquivalenceProperties;
12use datafusion_physical_plan::display::ProjectSchemaDisplay;
13use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
14use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
15use datafusion_physical_plan::{
16 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
17};
18use futures::{StreamExt, TryStreamExt};
19use indexlake::catalog::DataFileRecord;
20use indexlake::table::{Table, TableScan, TableScanPartition};
21use log::error;
22
23use crate::{LazyTable, datafusion_expr_to_indexlake_expr};
24
25#[derive(Debug)]
26pub struct IndexLakeScanExec {
27 pub lazy_table: LazyTable,
28 pub output_schema: SchemaRef,
29 pub partition_count: usize,
30 pub data_files: Option<Arc<Vec<DataFileRecord>>>,
31 pub projection: Option<Vec<usize>>,
32 pub filters: Vec<Expr>,
33 pub batch_size: usize,
34 pub limit: Option<usize>,
35 pub data_file_partition_ranges: Option<Vec<Option<Range<usize>>>>,
36 properties: PlanProperties,
37}
38
39impl IndexLakeScanExec {
40 #[allow(clippy::too_many_arguments)]
41 pub fn try_new(
42 lazy_table: LazyTable,
43 output_schema: SchemaRef,
44 partition_count: usize,
45 data_files: Option<Arc<Vec<DataFileRecord>>>,
46 projection: Option<Vec<usize>>,
47 filters: Vec<Expr>,
48 batch_size: usize,
49 limit: Option<usize>,
50 ) -> Result<Self, DataFusionError> {
51 let projected_schema = project_schema(&output_schema, projection.as_ref())?;
52 let properties = PlanProperties::new(
53 EquivalenceProperties::new(projected_schema),
54 Partitioning::UnknownPartitioning(partition_count),
55 EmissionType::Incremental,
56 Boundedness::Bounded,
57 );
58 let data_file_partition_ranges = data_files
59 .as_ref()
60 .map(|files| calc_data_file_partition_ranges(partition_count, files.len()));
61 Ok(Self {
62 lazy_table,
63 output_schema,
64 partition_count,
65 data_files,
66 projection,
67 filters,
68 batch_size,
69 limit,
70 data_file_partition_ranges,
71 properties,
72 })
73 }
74
75 pub fn get_scan_partition(&self, partition: Option<usize>) -> TableScanPartition {
76 match partition {
77 Some(partition) => {
78 if let Some(data_files) = self.data_files.as_ref()
79 && let Some(data_file_partition_ranges) =
80 self.data_file_partition_ranges.as_ref()
81 {
82 let range = data_file_partition_ranges[partition].clone();
83 TableScanPartition::Provided {
84 contains_inline_rows: partition == 0,
85 data_file_records: if let Some(range) = range {
86 data_files[range].to_vec()
87 } else {
88 vec![]
89 },
90 }
91 } else {
92 TableScanPartition::Auto {
93 partition_idx: partition,
94 partition_count: self.partition_count,
95 }
96 }
97 }
98 None => TableScanPartition::single_partition(),
99 }
100 }
101}
102
103impl ExecutionPlan for IndexLakeScanExec {
104 fn name(&self) -> &str {
105 "IndexLakeScanExec"
106 }
107
108 fn as_any(&self) -> &dyn Any {
109 self
110 }
111
112 fn properties(&self) -> &PlanProperties {
113 &self.properties
114 }
115
116 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
117 vec![]
118 }
119
120 fn with_new_children(
121 self: Arc<Self>,
122 _children: Vec<Arc<dyn ExecutionPlan>>,
123 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
124 Ok(self)
125 }
126
127 fn execute(
128 &self,
129 partition: usize,
130 _context: Arc<TaskContext>,
131 ) -> Result<SendableRecordBatchStream, DataFusionError> {
132 if partition >= self.partition_count {
133 return Err(DataFusionError::Execution(format!(
134 "partition index out of range: {partition} >= {}",
135 self.partition_count
136 )));
137 }
138
139 let df_schema = DFSchema::try_from(self.output_schema.as_ref().clone())?;
140 let il_filters = self
141 .filters
142 .iter()
143 .map(|f| datafusion_expr_to_indexlake_expr(f, &df_schema))
144 .collect::<Result<Vec<_>, _>>()?;
145
146 let scan_partition = self.get_scan_partition(Some(partition));
147
148 let scan = TableScan::default()
149 .with_projection(self.projection.clone())
150 .with_filters(il_filters)
151 .with_batch_size(self.batch_size)
152 .with_partition(scan_partition)
153 .with_limit(self.limit);
154
155 let projected_schema = self.schema();
156 let lazy_table = self.lazy_table.clone();
157
158 let fut = async move {
159 let table = lazy_table
160 .get_or_load()
161 .await
162 .map_err(|e| DataFusionError::External(Box::new(e)))?;
163 get_batch_stream(table, projected_schema.clone(), scan).await
164 };
165 let stream = futures::stream::once(fut).try_flatten();
166 Ok(Box::pin(RecordBatchStreamAdapter::new(
167 self.schema(),
168 stream,
169 )))
170 }
171
172 fn partition_statistics(
173 &self,
174 partition: Option<usize>,
175 ) -> Result<Statistics, DataFusionError> {
176 let scan_partition = self.get_scan_partition(partition);
177 let lazy_table = self.lazy_table.clone();
178
179 let row_count_result = tokio::task::block_in_place(|| {
180 tokio::runtime::Handle::current().block_on(async {
181 let table = lazy_table.get_or_load().await?;
182 table.count(scan_partition).await
183 })
184 });
185 match row_count_result {
186 Ok(row_count) => {
187 if self.filters.is_empty() {
188 if let Some(limit) = self.limit {
189 Ok(Statistics {
190 num_rows: Precision::Exact(std::cmp::min(row_count, limit)),
191 total_byte_size: Precision::Absent,
192 column_statistics: Statistics::unknown_column(&self.schema()),
193 })
194 } else {
195 Ok(Statistics {
196 num_rows: Precision::Exact(row_count),
197 total_byte_size: Precision::Absent,
198 column_statistics: Statistics::unknown_column(&self.schema()),
199 })
200 }
201 } else {
202 Ok(Statistics {
203 num_rows: Precision::Inexact(row_count),
204 total_byte_size: Precision::Absent,
205 column_statistics: Statistics::unknown_column(&self.schema()),
206 })
207 }
208 }
209 Err(e) => Err(DataFusionError::Plan(format!(
210 "Error getting indexlake table {}.{} row count: {:?}",
211 self.lazy_table.namespace_name, self.lazy_table.table_name, e
212 ))),
213 }
214 }
215
216 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
217 match IndexLakeScanExec::try_new(
218 self.lazy_table.clone(),
219 self.output_schema.clone(),
220 self.partition_count,
221 self.data_files.clone(),
222 self.projection.clone(),
223 self.filters.clone(),
224 self.batch_size,
225 limit,
226 ) {
227 Ok(exec) => Some(Arc::new(exec)),
228 Err(e) => {
229 error!("[indexlake] Failed to create IndexLakeScanExec with fetch: {e}");
230 None
231 }
232 }
233 }
234
235 fn fetch(&self) -> Option<usize> {
236 self.limit
237 }
238}
239
240impl DisplayAs for IndexLakeScanExec {
241 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
242 write!(
243 f,
244 "IndexLakeScanExec: table={}.{}, partitions={}",
245 self.lazy_table.namespace_name, self.lazy_table.table_name, self.partition_count
246 )?;
247 let projected_schema = self.schema();
248 if !schema_projection_equals(&projected_schema, &self.output_schema) {
249 write!(
250 f,
251 ", projection={}",
252 ProjectSchemaDisplay(&projected_schema)
253 )?;
254 }
255 if !self.filters.is_empty() {
256 write!(
257 f,
258 ", filters=[{}]",
259 self.filters
260 .iter()
261 .map(|f| f.to_string())
262 .collect::<Vec<_>>()
263 .join(", ")
264 )?;
265 }
266 if let Some(limit) = self.limit {
267 write!(f, ", limit={limit}")?;
268 }
269 Ok(())
270 }
271}
272
273fn schema_projection_equals(left: &Schema, right: &Schema) -> bool {
274 if left.fields.len() != right.fields.len() {
275 return false;
276 }
277 for (left_field, right_field) in left.fields.iter().zip(right.fields.iter()) {
278 if left_field.name() != right_field.name() {
279 return false;
280 }
281 }
282 true
283}
284
285async fn get_batch_stream(
286 table: Arc<Table>,
287 projected_schema: SchemaRef,
288 mut scan: TableScan,
289) -> Result<SendableRecordBatchStream, DataFusionError> {
290 let stream = if scan.projection == Some(Vec::new()) {
291 scan.projection = Some(vec![0]);
292 let stream = table
293 .scan(scan)
294 .await
295 .map_err(|e| DataFusionError::Execution(e.to_string()))?;
296 stream
297 .map(|batch| {
298 let batch = batch?;
299 let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
300 let new_batch =
301 RecordBatch::try_new_with_options(Arc::new(Schema::empty()), vec![], &options)?;
302 Ok(new_batch)
303 })
304 .boxed()
305 } else {
306 table
307 .scan(scan)
308 .await
309 .map_err(|e| DataFusionError::Execution(e.to_string()))?
310 };
311 let stream = stream.map_err(|e| DataFusionError::Execution(e.to_string()));
312 Ok(Box::pin(RecordBatchStreamAdapter::new(
313 projected_schema,
314 stream,
315 )))
316}
317
318fn calc_data_file_partition_ranges(
319 partition_count: usize,
320 data_file_count: usize,
321) -> Vec<Option<Range<usize>>> {
322 let mut partition_allocations = vec![0; partition_count];
323
324 if partition_count > data_file_count {
325 for partition_allocation in partition_allocations.iter_mut().take(data_file_count) {
326 *partition_allocation = 1;
327 }
328 } else {
329 let partition_size = data_file_count / partition_count;
330 for partition_allocation in partition_allocations.iter_mut() {
331 *partition_allocation = partition_size;
332 }
333
334 let left = data_file_count - partition_count * partition_size;
335 for partition_allocation in partition_allocations.iter_mut().take(left) {
336 *partition_allocation += 1;
337 }
338 }
339
340 let mut ranges = Vec::with_capacity(partition_count);
341 let mut start = 0usize;
342 for partition_allocation in partition_allocations.iter() {
343 if *partition_allocation == 0 {
344 ranges.push(None);
345 } else {
346 let partition_range = start..start + *partition_allocation;
347 ranges.push(Some(partition_range));
348 start += *partition_allocation;
349 }
350 }
351
352 ranges
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_partition_data_file_range() {
361 let ranges = calc_data_file_partition_ranges(2, 0);
362 assert_eq!(ranges, vec![None, None]);
363
364 let ranges = calc_data_file_partition_ranges(2, 1);
365 assert_eq!(ranges, vec![Some(0..1), None]);
366
367 let ranges = calc_data_file_partition_ranges(2, 2);
368 assert_eq!(ranges, vec![Some(0..1), Some(1..2)]);
369
370 let ranges = calc_data_file_partition_ranges(2, 3);
371 assert_eq!(ranges, vec![Some(0..2), Some(2..3)]);
372
373 let ranges = calc_data_file_partition_ranges(2, 4);
374 assert_eq!(ranges, vec![Some(0..2), Some(2..4)]);
375 }
376}