1use std::sync::Arc;
19
20use arrow::datatypes::{Schema, SchemaRef};
21use datafusion_common::{
22 Result, ScalarValue,
23 tree_node::{Transformed, TransformedResult, TreeNode},
24};
25use datafusion_physical_expr::{
26 expressions::{Column, Literal},
27 projection::{ProjectionExpr, ProjectionExprs},
28};
29use futures::{FutureExt, StreamExt};
30use itertools::Itertools;
31
32use crate::{
33 PartitionedFile, TableSchema,
34 file_stream::{FileOpenFuture, FileOpener},
35};
36
37pub struct ProjectionOpener {
48 inner: Arc<dyn FileOpener>,
49 projection: ProjectionExprs,
50 input_schema: SchemaRef,
51 partition_columns: Vec<PartitionColumnIndex>,
52}
53
54impl ProjectionOpener {
55 pub fn try_new(
56 projection: SplitProjection,
57 inner: Arc<dyn FileOpener>,
58 file_schema: &Schema,
59 ) -> Result<Arc<dyn FileOpener>> {
60 Ok(Arc::new(ProjectionOpener {
61 inner,
62 projection: projection.remapped_projection,
63 input_schema: Arc::new(file_schema.project(&projection.file_indices)?),
64 partition_columns: projection.partition_columns,
65 }))
66 }
67}
68
69impl FileOpener for ProjectionOpener {
70 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
71 let partition_values = partitioned_file.partition_values.clone();
72 let projection = if self.partition_columns.is_empty() {
75 self.projection.clone()
76 } else {
77 inject_partition_columns_into_projection(
78 &self.projection,
79 &self.partition_columns,
80 partition_values,
81 )
82 };
83 let projector = projection.make_projector(&self.input_schema)?;
84
85 let inner = self.inner.open(partitioned_file)?;
86
87 Ok(async move {
88 let stream = inner.await?;
89 let stream = stream.map(move |batch| {
90 let batch = batch?;
91 let batch = projector.project_batch(&batch)?;
92 Ok(batch)
93 });
94 Ok(stream.boxed())
95 }
96 .boxed())
97 }
98}
99
100#[derive(Debug, Clone, Copy)]
101pub struct PartitionColumnIndex {
102 pub in_remainder_projection: usize,
104 pub in_partition_values: usize,
106}
107
108fn inject_partition_columns_into_projection(
109 projection: &ProjectionExprs,
110 partition_columns: &[PartitionColumnIndex],
111 partition_values: Vec<ScalarValue>,
112) -> ProjectionExprs {
113 let partition_literals: Vec<Arc<Literal>> = partition_values
115 .into_iter()
116 .map(|value| Arc::new(Literal::new(value)))
117 .collect();
118
119 let projections = projection
120 .iter()
121 .map(|projection| {
122 let expr = Arc::clone(&projection.expr)
123 .transform(|expr| {
124 let original_expr = Arc::clone(&expr);
125 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
126 if let Some(pci) = partition_columns
128 .iter()
129 .find(|pci| pci.in_remainder_projection == column.index())
130 {
131 let literal =
132 Arc::clone(&partition_literals[pci.in_partition_values]);
133 return Ok(Transformed::yes(literal));
134 }
135 }
136 Ok(Transformed::no(original_expr))
137 })
138 .data()
139 .expect("infallible transform");
140 ProjectionExpr::new(expr, projection.alias.clone())
141 })
142 .collect_vec();
143 ProjectionExprs::new(projections)
144}
145
146#[derive(Debug, Clone)]
159pub struct SplitProjection {
160 pub source: ProjectionExprs,
162 pub file_indices: Vec<usize>,
164 pub(crate) partition_columns: Vec<PartitionColumnIndex>,
166 pub(crate) remapped_projection: ProjectionExprs,
168}
169
170impl SplitProjection {
171 pub fn unprojected(table_schema: &TableSchema) -> Self {
172 let projection = ProjectionExprs::from_indices(
173 &(0..table_schema.table_schema().fields().len()).collect_vec(),
174 table_schema.table_schema(),
175 );
176 Self::new(table_schema.file_schema(), &projection)
177 }
178
179 pub fn new(logical_file_schema: &Schema, projection: &ProjectionExprs) -> Self {
190 let num_file_schema_columns = logical_file_schema.fields().len();
191
192 let mut file_columns = Vec::new();
194 let mut partition_columns = Vec::new();
195 let mut all_columns = std::collections::HashMap::new();
196
197 for proj_expr in projection {
199 proj_expr
200 .expr
201 .apply(|expr| {
202 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
203 all_columns
204 .entry(column.index())
205 .or_insert_with(|| column.name().to_string());
206 }
207 Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
208 })
209 .expect("infallible apply");
210 }
211
212 let mut sorted_columns: Vec<_> = all_columns
214 .into_iter()
215 .map(|(idx, name)| (name, idx))
216 .collect();
217 sorted_columns.sort_by_key(|(_, idx)| *idx);
218
219 let mut column_mapping = std::collections::HashMap::new();
222 let mut file_idx = 0;
223 let mut partition_idx = 0;
224
225 for (name, original_index) in sorted_columns {
226 let new_index = if original_index < num_file_schema_columns {
227 file_columns.push(original_index);
229 let idx = file_idx;
230 file_idx += 1;
231 idx
232 } else {
233 partition_columns.push(original_index);
235 let idx = file_idx + partition_idx;
236 partition_idx += 1;
237 idx
238 };
239
240 let new_column: Arc<dyn datafusion_physical_plan::PhysicalExpr> =
242 Arc::new(Column::new(&name, new_index));
243 column_mapping.insert(original_index, new_column);
244 }
245
246 let remapped_projection = projection
248 .iter()
249 .map(|proj_expr| {
250 let expr = Arc::clone(&proj_expr.expr)
251 .transform(|expr| {
252 let original_expr = Arc::clone(&expr);
253 if let Some(column) = expr.as_any().downcast_ref::<Column>()
254 && let Some(new_column) = column_mapping.get(&column.index())
255 {
256 return Ok(Transformed::yes(Arc::clone(new_column)));
257 }
258 Ok(Transformed::no(original_expr))
259 })
260 .data()
261 .expect("infallible transform");
262 ProjectionExpr::new(expr, proj_expr.alias.clone())
263 })
264 .collect_vec();
265
266 let num_file_columns = file_columns.len();
268 let partition_column_mappings = partition_columns
269 .iter()
270 .enumerate()
271 .map(|(partition_idx, &table_index)| PartitionColumnIndex {
272 in_remainder_projection: num_file_columns + partition_idx,
273 in_partition_values: table_index - num_file_schema_columns,
274 })
275 .collect_vec();
276
277 Self {
278 source: projection.clone(),
279 file_indices: file_columns,
280 partition_columns: partition_column_mappings,
281 remapped_projection: ProjectionExprs::from(remapped_projection),
282 }
283 }
284}
285
286#[cfg(test)]
287mod test {
288 use std::sync::Arc;
289
290 use arrow::array::AsArray;
291 use arrow::datatypes::{DataType, SchemaRef};
292 use datafusion_common::{DFSchema, ScalarValue, record_batch};
293 use datafusion_expr::{Expr, col, execution_props::ExecutionProps};
294 use datafusion_physical_expr::{create_physical_exprs, projection::ProjectionExpr};
295 use itertools::Itertools;
296
297 use super::*;
298
299 fn create_projection_exprs<'a>(
300 exprs: impl IntoIterator<Item = &'a Expr>,
301 schema: &SchemaRef,
302 ) -> ProjectionExprs {
303 let df_schema = DFSchema::try_from(Arc::clone(schema)).unwrap();
304 let physical_exprs =
305 create_physical_exprs(exprs, &df_schema, &ExecutionProps::default()).unwrap();
306 let projection_exprs = physical_exprs
307 .into_iter()
308 .enumerate()
309 .map(|(i, e)| ProjectionExpr::new(Arc::clone(&e), format!("col{i}")))
310 .collect_vec();
311 ProjectionExprs::from(projection_exprs)
312 }
313
314 #[test]
315 fn test_split_projection_with_partition_columns() {
316 use arrow::array::AsArray;
317 use arrow::datatypes::Field;
318 let file_schema = Arc::new(Schema::new(vec![
321 Field::new("id", DataType::Int32, false),
322 Field::new("bool_col", DataType::Boolean, false),
323 Field::new("tinyint_col", DataType::Int8, false),
324 ]));
325
326 let table_schema = Arc::new(Schema::new(vec![
328 Field::new("id", DataType::Int32, false),
329 Field::new("bool_col", DataType::Boolean, false),
330 Field::new("tinyint_col", DataType::Int8, false),
331 Field::new("date", DataType::Utf8, false), ]));
333
334 let projection_indices = vec![0, 1, 3, 2];
337
338 let projection =
340 ProjectionExprs::from_indices(&projection_indices, &table_schema);
341
342 let split = SplitProjection::new(&file_schema, &projection);
344
345 assert_eq!(split.file_indices, vec![0, 1, 2]);
347
348 assert_eq!(split.partition_columns.len(), 1);
350 assert_eq!(split.partition_columns[0].in_partition_values, 0);
351
352 let file_batch = record_batch!(
354 ("id", Int32, vec![4]),
355 ("bool_col", Boolean, vec![true]),
356 ("tinyint_col", Int8, vec![0])
357 )
358 .unwrap();
359
360 let partition_values = vec![ScalarValue::from("2021-10-26")];
367
368 let partition_columns = vec![PartitionColumnIndex {
370 in_remainder_projection: 3, in_partition_values: 0, }];
373
374 let injected_projection = inject_partition_columns_into_projection(
376 &split.remapped_projection,
377 &partition_columns,
378 partition_values,
379 );
380
381 let projector = injected_projection
383 .make_projector(&file_batch.schema())
384 .unwrap();
385 let result = projector.project_batch(&file_batch).unwrap();
386
387 assert_eq!(result.num_columns(), 4);
389 assert_eq!(
390 result
391 .column(0)
392 .as_primitive::<arrow::datatypes::Int32Type>()
393 .value(0),
394 4
395 );
396 assert!(result.column(1).as_boolean().value(0));
397 assert_eq!(result.column(2).as_string::<i32>().value(0), "2021-10-26");
398 assert_eq!(
399 result
400 .column(3)
401 .as_primitive::<arrow::datatypes::Int8Type>()
402 .value(0),
403 0
404 );
405 }
406
407 fn create_test_schemas(
413 file_cols: usize,
414 partition_cols: usize,
415 ) -> (SchemaRef, SchemaRef) {
416 use arrow::datatypes::Field;
417
418 let file_fields: Vec<_> = (0..file_cols)
419 .map(|i| Field::new(format!("col_{i}"), DataType::Int32, false))
420 .collect();
421
422 let mut table_fields = file_fields.clone();
423 table_fields.extend(
424 (0..partition_cols)
425 .map(|i| Field::new(format!("part_{i}"), DataType::Utf8, false)),
426 );
427
428 (
429 Arc::new(Schema::new(file_fields)),
430 Arc::new(Schema::new(table_fields)),
431 )
432 }
433
434 #[test]
439 fn test_split_projection_only_file_columns() {
440 let (file_schema, table_schema) = create_test_schemas(3, 2);
441 let projection = ProjectionExprs::from_indices(&[0, 1, 2], &table_schema);
443
444 let split = SplitProjection::new(&file_schema, &projection);
445
446 assert_eq!(split.file_indices, vec![0, 1, 2]);
447 assert_eq!(split.partition_columns.len(), 0);
448 }
449
450 #[test]
451 fn test_split_projection_only_partition_columns() {
452 let (file_schema, table_schema) = create_test_schemas(3, 2);
453 let projection = ProjectionExprs::from_indices(&[3, 4], &table_schema);
455
456 let split = SplitProjection::new(&file_schema, &projection);
457
458 assert_eq!(split.file_indices, Vec::<usize>::new());
459 assert_eq!(split.partition_columns.len(), 2);
460 assert_eq!(split.partition_columns[0].in_partition_values, 0);
461 assert_eq!(split.partition_columns[1].in_partition_values, 1);
462 }
463
464 #[test]
465 fn test_split_projection_multiple_partition_columns() {
466 let (file_schema, table_schema) = create_test_schemas(2, 3);
467 let projection = ProjectionExprs::from_indices(&[0, 2, 4, 1, 3], &table_schema);
470
471 let split = SplitProjection::new(&file_schema, &projection);
472
473 assert_eq!(split.file_indices, vec![0, 1]);
474 assert_eq!(split.partition_columns.len(), 3);
475 assert_eq!(split.partition_columns[0].in_partition_values, 0);
476 assert_eq!(split.partition_columns[1].in_partition_values, 1);
477 assert_eq!(split.partition_columns[2].in_partition_values, 2);
478
479 assert_eq!(split.remapped_projection.iter().count(), 5);
482 }
483
484 #[test]
485 fn test_split_projection_partition_columns_reverse_order() {
486 let (file_schema, table_schema) = create_test_schemas(2, 2);
487 let projection = ProjectionExprs::from_indices(&[3, 2], &table_schema);
490
491 let split = SplitProjection::new(&file_schema, &projection);
492
493 assert_eq!(split.file_indices, Vec::<usize>::new());
494 assert_eq!(split.partition_columns.len(), 2);
495 assert_eq!(split.partition_columns[0].in_partition_values, 0);
496 assert_eq!(split.partition_columns[1].in_partition_values, 1);
497 }
498
499 #[test]
500 fn test_split_projection_interleaved_file_and_partition() {
501 let (file_schema, table_schema) = create_test_schemas(3, 3);
502 let projection =
505 ProjectionExprs::from_indices(&[0, 3, 1, 4, 2, 5], &table_schema);
506
507 let split = SplitProjection::new(&file_schema, &projection);
508
509 assert_eq!(split.file_indices, vec![0, 1, 2]);
510 assert_eq!(split.partition_columns.len(), 3);
511 assert_eq!(split.partition_columns[0].in_partition_values, 0);
512 assert_eq!(split.partition_columns[1].in_partition_values, 1);
513 assert_eq!(split.partition_columns[2].in_partition_values, 2);
514 }
515
516 #[test]
517 fn test_split_projection_expression_with_file_and_partition_columns() {
518 use arrow::datatypes::Field;
519
520 let file_schema = Arc::new(Schema::new(vec![
522 Field::new("file_a", DataType::Int32, false),
523 Field::new("file_b", DataType::Int32, false),
524 ]));
525 let table_schema = Arc::new(Schema::new(vec![
526 Field::new("file_a", DataType::Int32, false),
527 Field::new("file_b", DataType::Int32, false),
528 Field::new("part_c", DataType::Int32, false),
529 ]));
530
531 let exprs = [col("file_a") + col("part_c")];
533 let projection = create_projection_exprs(exprs.iter(), &table_schema);
534
535 let split = SplitProjection::new(&file_schema, &projection);
536
537 assert_eq!(split.file_indices, vec![0]);
539 assert_eq!(split.partition_columns.len(), 1);
540 assert_eq!(split.partition_columns[0].in_partition_values, 0);
541 }
542
543 #[test]
548 fn test_split_projection_boundary_last_file_column() {
549 let (file_schema, table_schema) = create_test_schemas(3, 2);
550 let projection = ProjectionExprs::from_indices(&[2], &table_schema);
552
553 let split = SplitProjection::new(&file_schema, &projection);
554
555 assert_eq!(split.file_indices, vec![2]);
556 assert_eq!(split.partition_columns.len(), 0);
557 }
558
559 #[test]
560 fn test_split_projection_boundary_first_partition_column() {
561 let (file_schema, table_schema) = create_test_schemas(3, 2);
562 let projection = ProjectionExprs::from_indices(&[3], &table_schema);
564
565 let split = SplitProjection::new(&file_schema, &projection);
566
567 assert_eq!(split.file_indices, Vec::<usize>::new());
568 assert_eq!(split.partition_columns.len(), 1);
569 assert_eq!(split.partition_columns[0].in_partition_values, 0);
570 }
571
572 #[test]
577 fn test_inject_partition_columns_multiple_partitions() {
578 let data =
579 record_batch!(("col_0", Int32, vec![1]), ("col_1", Int32, vec![2])).unwrap();
580
581 let (file_schema, table_schema) = create_test_schemas(2, 2);
583 let projection = ProjectionExprs::from_indices(&[0, 2, 1, 3], &table_schema);
585 let split = SplitProjection::new(&file_schema, &projection);
586
587 let partition_columns = vec![
589 PartitionColumnIndex {
590 in_remainder_projection: 2, in_partition_values: 0,
592 },
593 PartitionColumnIndex {
594 in_remainder_projection: 3, in_partition_values: 1,
596 },
597 ];
598
599 let partition_values =
600 vec![ScalarValue::from("part_a"), ScalarValue::from("part_b")];
601
602 let injected = inject_partition_columns_into_projection(
603 &split.remapped_projection,
604 &partition_columns,
605 partition_values,
606 );
607
608 let projector = injected.make_projector(&data.schema()).unwrap();
610 let result = projector.project_batch(&data).unwrap();
611
612 assert_eq!(result.num_columns(), 4);
613 assert_eq!(
614 result
615 .column(0)
616 .as_primitive::<arrow::datatypes::Int32Type>()
617 .value(0),
618 1
619 );
620 assert_eq!(result.column(1).as_string::<i32>().value(0), "part_a");
621 assert_eq!(
622 result
623 .column(2)
624 .as_primitive::<arrow::datatypes::Int32Type>()
625 .value(0),
626 2
627 );
628 assert_eq!(result.column(3).as_string::<i32>().value(0), "part_b");
629 }
630}