datafusion_datasource/
projection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::datatypes::{Schema, SchemaRef};
21use datafusion_common::{
22    Result, ScalarValue,
23    tree_node::{Transformed, TransformedResult, TreeNode},
24};
25use datafusion_physical_expr::{
26    expressions::{Column, Literal},
27    projection::{ProjectionExpr, ProjectionExprs},
28};
29use futures::{FutureExt, StreamExt};
30use itertools::Itertools;
31
32use crate::{
33    PartitionedFile, TableSchema,
34    file_stream::{FileOpenFuture, FileOpener},
35};
36
37/// A file opener that handles applying a projection on top of an inner opener.
38///
39/// This includes handling partition columns.
40///
41/// Any projection pushed down will be split up into:
42/// - Simple column indices / column selection
43/// - A remainder projection that this opener applies on top of it
44///
45/// This is meant to simplify projection pushdown for sources like CSV
46/// that can only handle "simple" column selection.
47pub struct ProjectionOpener {
48    inner: Arc<dyn FileOpener>,
49    projection: ProjectionExprs,
50    input_schema: SchemaRef,
51    partition_columns: Vec<PartitionColumnIndex>,
52}
53
54impl ProjectionOpener {
55    pub fn try_new(
56        projection: SplitProjection,
57        inner: Arc<dyn FileOpener>,
58        file_schema: &Schema,
59    ) -> Result<Arc<dyn FileOpener>> {
60        Ok(Arc::new(ProjectionOpener {
61            inner,
62            projection: projection.remapped_projection,
63            input_schema: Arc::new(file_schema.project(&projection.file_indices)?),
64            partition_columns: projection.partition_columns,
65        }))
66    }
67}
68
69impl FileOpener for ProjectionOpener {
70    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
71        let partition_values = partitioned_file.partition_values.clone();
72        // Modify any references to partition columns in the projection expressions
73        // and substitute them with literal values from PartitionedFile.partition_values
74        let projection = if self.partition_columns.is_empty() {
75            self.projection.clone()
76        } else {
77            inject_partition_columns_into_projection(
78                &self.projection,
79                &self.partition_columns,
80                partition_values,
81            )
82        };
83        let projector = projection.make_projector(&self.input_schema)?;
84
85        let inner = self.inner.open(partitioned_file)?;
86
87        Ok(async move {
88            let stream = inner.await?;
89            let stream = stream.map(move |batch| {
90                let batch = batch?;
91                let batch = projector.project_batch(&batch)?;
92                Ok(batch)
93            });
94            Ok(stream.boxed())
95        }
96        .boxed())
97    }
98}
99
100#[derive(Debug, Clone, Copy)]
101pub struct PartitionColumnIndex {
102    /// The index of this partition column in the remainder projection (>= num_file_columns)
103    pub in_remainder_projection: usize,
104    /// The index of this partition column in the partition_values array
105    pub in_partition_values: usize,
106}
107
108fn inject_partition_columns_into_projection(
109    projection: &ProjectionExprs,
110    partition_columns: &[PartitionColumnIndex],
111    partition_values: Vec<ScalarValue>,
112) -> ProjectionExprs {
113    // Pre-create all literals for partition columns to avoid cloning ScalarValues multiple times.
114    let partition_literals: Vec<Arc<Literal>> = partition_values
115        .into_iter()
116        .map(|value| Arc::new(Literal::new(value)))
117        .collect();
118
119    let projections = projection
120        .iter()
121        .map(|projection| {
122            let expr = Arc::clone(&projection.expr)
123                .transform(|expr| {
124                    let original_expr = Arc::clone(&expr);
125                    if let Some(column) = expr.as_any().downcast_ref::<Column>() {
126                        // Check if this column index corresponds to a partition column
127                        if let Some(pci) = partition_columns
128                            .iter()
129                            .find(|pci| pci.in_remainder_projection == column.index())
130                        {
131                            let literal =
132                                Arc::clone(&partition_literals[pci.in_partition_values]);
133                            return Ok(Transformed::yes(literal));
134                        }
135                    }
136                    Ok(Transformed::no(original_expr))
137                })
138                .data()
139                .expect("infallible transform");
140            ProjectionExpr::new(expr, projection.alias.clone())
141        })
142        .collect_vec();
143    ProjectionExprs::new(projections)
144}
145
146/// At a high level the goal of SplitProjection is to take a ProjectionExprs meant to be applied to the table schema
147/// and split that into:
148/// - The projection indices into the file schema (file_indices)
149/// - The projection indices into the partition values (partition_value_indices), which pre-compute both the index into the table schema
150///   and the index into the partition values array
151/// - A remapped projection that can be applied after the file projection is applied
152///   This remapped projection has the following properties:
153///     - Column indices referring to file columns are remapped to [0..file_indices.len())
154///     - Column indices referring to partition columns are remapped to [file_indices.len()..)
155///
156///   This allows the ProjectionOpener to easily identify which columns in the remapped projection
157///   refer to partition columns and substitute them with literals from the partition values.
158#[derive(Debug, Clone)]
159pub struct SplitProjection {
160    /// The original projection this [`SplitProjection`] was derived from
161    pub source: ProjectionExprs,
162    /// Column indices to read from file (public for file sources)
163    pub file_indices: Vec<usize>,
164    /// Pre-computed partition column mappings (internal, used by ProjectionOpener)
165    pub(crate) partition_columns: Vec<PartitionColumnIndex>,
166    /// The remapped projection (internal, used by ProjectionOpener)
167    pub(crate) remapped_projection: ProjectionExprs,
168}
169
170impl SplitProjection {
171    pub fn unprojected(table_schema: &TableSchema) -> Self {
172        let projection = ProjectionExprs::from_indices(
173            &(0..table_schema.table_schema().fields().len()).collect_vec(),
174            table_schema.table_schema(),
175        );
176        Self::new(table_schema.file_schema(), &projection)
177    }
178
179    /// Creates a new [`SplitProjection`] by splitting a projection into
180    /// simple file column indices and a remainder projection that is applied after reading the file.
181    ///
182    /// In other words: we get a `Vec<usize>` projection that is meant to be applied on top of `file_schema`
183    /// and a remainder projection that is applied to the result of that first projection.
184    ///
185    /// Here `file_schema` is expected to be the *logical* schema of the file, that is the
186    /// table schema minus any partition columns.
187    /// Partition columns are always expected to be at the end of the table schema.
188    /// Note that `file_schema` is *not* the physical schema of the file.
189    pub fn new(logical_file_schema: &Schema, projection: &ProjectionExprs) -> Self {
190        let num_file_schema_columns = logical_file_schema.fields().len();
191
192        // Collect all unique columns and classify as file or partition
193        let mut file_columns = Vec::new();
194        let mut partition_columns = Vec::new();
195        let mut all_columns = std::collections::HashMap::new();
196
197        // Extract all unique column references (index -> name)
198        for proj_expr in projection {
199            proj_expr
200                .expr
201                .apply(|expr| {
202                    if let Some(column) = expr.as_any().downcast_ref::<Column>() {
203                        all_columns
204                            .entry(column.index())
205                            .or_insert_with(|| column.name().to_string());
206                    }
207                    Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
208                })
209                .expect("infallible apply");
210        }
211
212        // Sort by index and classify into file vs partition columns
213        let mut sorted_columns: Vec<_> = all_columns
214            .into_iter()
215            .map(|(idx, name)| (name, idx))
216            .collect();
217        sorted_columns.sort_by_key(|(_, idx)| *idx);
218
219        // Separate file and partition columns, assigning final indices
220        // Pre-create all remapped columns to avoid duplicate Arc'd expressions
221        let mut column_mapping = std::collections::HashMap::new();
222        let mut file_idx = 0;
223        let mut partition_idx = 0;
224
225        for (name, original_index) in sorted_columns {
226            let new_index = if original_index < num_file_schema_columns {
227                // File column: gets index [0..num_file_columns)
228                file_columns.push(original_index);
229                let idx = file_idx;
230                file_idx += 1;
231                idx
232            } else {
233                // Partition column: gets index [num_file_columns..)
234                partition_columns.push(original_index);
235                let idx = file_idx + partition_idx;
236                partition_idx += 1;
237                idx
238            };
239
240            // Pre-create the remapped column so all references can share the same Arc
241            let new_column: Arc<dyn datafusion_physical_plan::PhysicalExpr> =
242                Arc::new(Column::new(&name, new_index));
243            column_mapping.insert(original_index, new_column);
244        }
245
246        // Single tree transformation: remap all column references using pre-created columns
247        let remapped_projection = projection
248            .iter()
249            .map(|proj_expr| {
250                let expr = Arc::clone(&proj_expr.expr)
251                    .transform(|expr| {
252                        let original_expr = Arc::clone(&expr);
253                        if let Some(column) = expr.as_any().downcast_ref::<Column>()
254                            && let Some(new_column) = column_mapping.get(&column.index())
255                        {
256                            return Ok(Transformed::yes(Arc::clone(new_column)));
257                        }
258                        Ok(Transformed::no(original_expr))
259                    })
260                    .data()
261                    .expect("infallible transform");
262                ProjectionExpr::new(expr, proj_expr.alias.clone())
263            })
264            .collect_vec();
265
266        // Pre-compute partition column mappings for ProjectionOpener
267        let num_file_columns = file_columns.len();
268        let partition_column_mappings = partition_columns
269            .iter()
270            .enumerate()
271            .map(|(partition_idx, &table_index)| PartitionColumnIndex {
272                in_remainder_projection: num_file_columns + partition_idx,
273                in_partition_values: table_index - num_file_schema_columns,
274            })
275            .collect_vec();
276
277        Self {
278            source: projection.clone(),
279            file_indices: file_columns,
280            partition_columns: partition_column_mappings,
281            remapped_projection: ProjectionExprs::from(remapped_projection),
282        }
283    }
284}
285
286#[cfg(test)]
287mod test {
288    use std::sync::Arc;
289
290    use arrow::array::AsArray;
291    use arrow::datatypes::{DataType, SchemaRef};
292    use datafusion_common::{DFSchema, ScalarValue, record_batch};
293    use datafusion_expr::{Expr, col, execution_props::ExecutionProps};
294    use datafusion_physical_expr::{create_physical_exprs, projection::ProjectionExpr};
295    use itertools::Itertools;
296
297    use super::*;
298
299    fn create_projection_exprs<'a>(
300        exprs: impl IntoIterator<Item = &'a Expr>,
301        schema: &SchemaRef,
302    ) -> ProjectionExprs {
303        let df_schema = DFSchema::try_from(Arc::clone(schema)).unwrap();
304        let physical_exprs =
305            create_physical_exprs(exprs, &df_schema, &ExecutionProps::default()).unwrap();
306        let projection_exprs = physical_exprs
307            .into_iter()
308            .enumerate()
309            .map(|(i, e)| ProjectionExpr::new(Arc::clone(&e), format!("col{i}")))
310            .collect_vec();
311        ProjectionExprs::from(projection_exprs)
312    }
313
314    #[test]
315    fn test_split_projection_with_partition_columns() {
316        use arrow::array::AsArray;
317        use arrow::datatypes::Field;
318        // Simulate the avro_exec_with_partition test scenario:
319        // file_schema has 3 fields
320        let file_schema = Arc::new(Schema::new(vec![
321            Field::new("id", DataType::Int32, false),
322            Field::new("bool_col", DataType::Boolean, false),
323            Field::new("tinyint_col", DataType::Int8, false),
324        ]));
325
326        // table_schema has 4 fields (3 file + 1 partition)
327        let table_schema = Arc::new(Schema::new(vec![
328            Field::new("id", DataType::Int32, false),
329            Field::new("bool_col", DataType::Boolean, false),
330            Field::new("tinyint_col", DataType::Int8, false),
331            Field::new("date", DataType::Utf8, false), // partition column at index 3
332        ]));
333
334        // projection indices: [0, 1, 3, 2]
335        // This should select: id (0), bool_col (1), date (3-partition), tinyint_col (2)
336        let projection_indices = vec![0, 1, 3, 2];
337
338        // Create projection expressions from indices using the table schema
339        let projection =
340            ProjectionExprs::from_indices(&projection_indices, &table_schema);
341
342        // Call SplitProjection to separate file and partition columns
343        let split = SplitProjection::new(&file_schema, &projection);
344
345        // The file_indices should be [0, 1, 2] (all file columns needed)
346        assert_eq!(split.file_indices, vec![0, 1, 2]);
347
348        // Should have 1 partition column at in_partition_values index 0
349        assert_eq!(split.partition_columns.len(), 1);
350        assert_eq!(split.partition_columns[0].in_partition_values, 0);
351
352        // Now create a batch with only the file columns
353        let file_batch = record_batch!(
354            ("id", Int32, vec![4]),
355            ("bool_col", Boolean, vec![true]),
356            ("tinyint_col", Int8, vec![0])
357        )
358        .unwrap();
359
360        // After the fix, the remainder projection should have remapped indices:
361        // - File columns: [0, 1, 2] (unchanged since they're already in order)
362        // - Partition column: [3] (stays at index 3, which is >= num_file_columns)
363        // So the remainder expects input columns [0, 1, 2] and references column [3] for partition
364
365        // Verify that we can inject partition columns and apply the projection
366        let partition_values = vec![ScalarValue::from("2021-10-26")];
367
368        // Create partition column mapping
369        let partition_columns = vec![PartitionColumnIndex {
370            in_remainder_projection: 3, // partition column is at index 3 in remainder
371            in_partition_values: 0,     // first partition value
372        }];
373
374        // Inject partition columns (replaces Column(3) with Literal)
375        let injected_projection = inject_partition_columns_into_projection(
376            &split.remapped_projection,
377            &partition_columns,
378            partition_values,
379        );
380
381        // Now the projection should work on the file batch
382        let projector = injected_projection
383            .make_projector(&file_batch.schema())
384            .unwrap();
385        let result = projector.project_batch(&file_batch).unwrap();
386
387        // Verify the output has the correct column order: id, bool_col, date, tinyint_col
388        assert_eq!(result.num_columns(), 4);
389        assert_eq!(
390            result
391                .column(0)
392                .as_primitive::<arrow::datatypes::Int32Type>()
393                .value(0),
394            4
395        );
396        assert!(result.column(1).as_boolean().value(0));
397        assert_eq!(result.column(2).as_string::<i32>().value(0), "2021-10-26");
398        assert_eq!(
399            result
400                .column(3)
401                .as_primitive::<arrow::datatypes::Int8Type>()
402                .value(0),
403            0
404        );
405    }
406
407    // ========================================================================
408    // Comprehensive Test Suite for SplitProjection
409    // ========================================================================
410
411    // Helper to create test schemas with file and partition columns
412    fn create_test_schemas(
413        file_cols: usize,
414        partition_cols: usize,
415    ) -> (SchemaRef, SchemaRef) {
416        use arrow::datatypes::Field;
417
418        let file_fields: Vec<_> = (0..file_cols)
419            .map(|i| Field::new(format!("col_{i}"), DataType::Int32, false))
420            .collect();
421
422        let mut table_fields = file_fields.clone();
423        table_fields.extend(
424            (0..partition_cols)
425                .map(|i| Field::new(format!("part_{i}"), DataType::Utf8, false)),
426        );
427
428        (
429            Arc::new(Schema::new(file_fields)),
430            Arc::new(Schema::new(table_fields)),
431        )
432    }
433
434    // ========================================================================
435    // Partition Column Handling Tests
436    // ========================================================================
437
438    #[test]
439    fn test_split_projection_only_file_columns() {
440        let (file_schema, table_schema) = create_test_schemas(3, 2);
441        // Select only file columns [0, 1, 2]
442        let projection = ProjectionExprs::from_indices(&[0, 1, 2], &table_schema);
443
444        let split = SplitProjection::new(&file_schema, &projection);
445
446        assert_eq!(split.file_indices, vec![0, 1, 2]);
447        assert_eq!(split.partition_columns.len(), 0);
448    }
449
450    #[test]
451    fn test_split_projection_only_partition_columns() {
452        let (file_schema, table_schema) = create_test_schemas(3, 2);
453        // Select only partition columns [3, 4]
454        let projection = ProjectionExprs::from_indices(&[3, 4], &table_schema);
455
456        let split = SplitProjection::new(&file_schema, &projection);
457
458        assert_eq!(split.file_indices, Vec::<usize>::new());
459        assert_eq!(split.partition_columns.len(), 2);
460        assert_eq!(split.partition_columns[0].in_partition_values, 0);
461        assert_eq!(split.partition_columns[1].in_partition_values, 1);
462    }
463
464    #[test]
465    fn test_split_projection_multiple_partition_columns() {
466        let (file_schema, table_schema) = create_test_schemas(2, 3);
467        // File cols: 0, 1; Partition cols: 2, 3, 4
468        // Select: [0, 2, 4, 1, 3] (mixed file and partition)
469        let projection = ProjectionExprs::from_indices(&[0, 2, 4, 1, 3], &table_schema);
470
471        let split = SplitProjection::new(&file_schema, &projection);
472
473        assert_eq!(split.file_indices, vec![0, 1]);
474        assert_eq!(split.partition_columns.len(), 3);
475        assert_eq!(split.partition_columns[0].in_partition_values, 0);
476        assert_eq!(split.partition_columns[1].in_partition_values, 1);
477        assert_eq!(split.partition_columns[2].in_partition_values, 2);
478
479        // Verify remapped projection has correct indices
480        // File columns should be at [0, 1], partition columns at [2, 3, 4]
481        assert_eq!(split.remapped_projection.iter().count(), 5);
482    }
483
484    #[test]
485    fn test_split_projection_partition_columns_reverse_order() {
486        let (file_schema, table_schema) = create_test_schemas(2, 2);
487        // File cols: 0, 1; Partition cols: 2, 3
488        // Select: [3, 2] (partitions in reverse)
489        let projection = ProjectionExprs::from_indices(&[3, 2], &table_schema);
490
491        let split = SplitProjection::new(&file_schema, &projection);
492
493        assert_eq!(split.file_indices, Vec::<usize>::new());
494        assert_eq!(split.partition_columns.len(), 2);
495        assert_eq!(split.partition_columns[0].in_partition_values, 0);
496        assert_eq!(split.partition_columns[1].in_partition_values, 1);
497    }
498
499    #[test]
500    fn test_split_projection_interleaved_file_and_partition() {
501        let (file_schema, table_schema) = create_test_schemas(3, 3);
502        // File cols: 0, 1, 2; Partition cols: 3, 4, 5
503        // Select: [0, 3, 1, 4, 2, 5] (alternating)
504        let projection =
505            ProjectionExprs::from_indices(&[0, 3, 1, 4, 2, 5], &table_schema);
506
507        let split = SplitProjection::new(&file_schema, &projection);
508
509        assert_eq!(split.file_indices, vec![0, 1, 2]);
510        assert_eq!(split.partition_columns.len(), 3);
511        assert_eq!(split.partition_columns[0].in_partition_values, 0);
512        assert_eq!(split.partition_columns[1].in_partition_values, 1);
513        assert_eq!(split.partition_columns[2].in_partition_values, 2);
514    }
515
516    #[test]
517    fn test_split_projection_expression_with_file_and_partition_columns() {
518        use arrow::datatypes::Field;
519
520        // Create schemas: 2 file columns, 1 partition column
521        let file_schema = Arc::new(Schema::new(vec![
522            Field::new("file_a", DataType::Int32, false),
523            Field::new("file_b", DataType::Int32, false),
524        ]));
525        let table_schema = Arc::new(Schema::new(vec![
526            Field::new("file_a", DataType::Int32, false),
527            Field::new("file_b", DataType::Int32, false),
528            Field::new("part_c", DataType::Int32, false),
529        ]));
530
531        // Create expression: file_a + part_c
532        let exprs = [col("file_a") + col("part_c")];
533        let projection = create_projection_exprs(exprs.iter(), &table_schema);
534
535        let split = SplitProjection::new(&file_schema, &projection);
536
537        // Should extract both columns
538        assert_eq!(split.file_indices, vec![0]);
539        assert_eq!(split.partition_columns.len(), 1);
540        assert_eq!(split.partition_columns[0].in_partition_values, 0);
541    }
542
543    // ========================================================================
544    // Category 4: Boundary Conditions
545    // ========================================================================
546
547    #[test]
548    fn test_split_projection_boundary_last_file_column() {
549        let (file_schema, table_schema) = create_test_schemas(3, 2);
550        // Last file column is index 2
551        let projection = ProjectionExprs::from_indices(&[2], &table_schema);
552
553        let split = SplitProjection::new(&file_schema, &projection);
554
555        assert_eq!(split.file_indices, vec![2]);
556        assert_eq!(split.partition_columns.len(), 0);
557    }
558
559    #[test]
560    fn test_split_projection_boundary_first_partition_column() {
561        let (file_schema, table_schema) = create_test_schemas(3, 2);
562        // First partition column is index 3
563        let projection = ProjectionExprs::from_indices(&[3], &table_schema);
564
565        let split = SplitProjection::new(&file_schema, &projection);
566
567        assert_eq!(split.file_indices, Vec::<usize>::new());
568        assert_eq!(split.partition_columns.len(), 1);
569        assert_eq!(split.partition_columns[0].in_partition_values, 0);
570    }
571
572    // ========================================================================
573    // Category 6: Integration Tests
574    // ========================================================================
575
576    #[test]
577    fn test_inject_partition_columns_multiple_partitions() {
578        let data =
579            record_batch!(("col_0", Int32, vec![1]), ("col_1", Int32, vec![2])).unwrap();
580
581        // Create projection that references file columns and partition columns
582        let (file_schema, table_schema) = create_test_schemas(2, 2);
583        // Projection: [0, 2, 1, 3] = [file_0, part_0, file_1, part_1]
584        let projection = ProjectionExprs::from_indices(&[0, 2, 1, 3], &table_schema);
585        let split = SplitProjection::new(&file_schema, &projection);
586
587        // Create partition column mappings
588        let partition_columns = vec![
589            PartitionColumnIndex {
590                in_remainder_projection: 2, // First partition column at index 2
591                in_partition_values: 0,
592            },
593            PartitionColumnIndex {
594                in_remainder_projection: 3, // Second partition column at index 3
595                in_partition_values: 1,
596            },
597        ];
598
599        let partition_values =
600            vec![ScalarValue::from("part_a"), ScalarValue::from("part_b")];
601
602        let injected = inject_partition_columns_into_projection(
603            &split.remapped_projection,
604            &partition_columns,
605            partition_values,
606        );
607
608        // Apply projection
609        let projector = injected.make_projector(&data.schema()).unwrap();
610        let result = projector.project_batch(&data).unwrap();
611
612        assert_eq!(result.num_columns(), 4);
613        assert_eq!(
614            result
615                .column(0)
616                .as_primitive::<arrow::datatypes::Int32Type>()
617                .value(0),
618            1
619        );
620        assert_eq!(result.column(1).as_string::<i32>().value(0), "part_a");
621        assert_eq!(
622            result
623                .column(2)
624                .as_primitive::<arrow::datatypes::Int32Type>()
625                .value(0),
626            2
627        );
628        assert_eq!(result.column(3).as_string::<i32>().value(0), "part_b");
629    }
630}