datafusion_physical_expr/
projection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Deref;
19use std::sync::Arc;
20
21use crate::expressions::Column;
22use crate::utils::collect_columns;
23use crate::PhysicalExpr;
24
25use arrow::datatypes::{Field, Schema, SchemaRef};
26use datafusion_common::stats::{ColumnStatistics, Precision};
27use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
28use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
29
30use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
31use indexmap::IndexMap;
32use itertools::Itertools;
33
34/// A projection expression as used by projection operations.
35///
36/// The expression is evaluated and the result is stored in a column
37/// with the name specified by `alias`.
38///
39/// For example, the SQL expression `a + b AS sum_ab` would be represented
40/// as a `ProjectionExpr` where `expr` is the expression `a + b`
41/// and `alias` is the string `sum_ab`.
42#[derive(Debug, Clone)]
43pub struct ProjectionExpr {
44    /// The expression that will be evaluated.
45    pub expr: Arc<dyn PhysicalExpr>,
46    /// The name of the output column for use an output schema.
47    pub alias: String,
48}
49
50impl std::fmt::Display for ProjectionExpr {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        if self.expr.to_string() == self.alias {
53            write!(f, "{}", self.alias)
54        } else {
55            write!(f, "{} AS {}", self.expr, self.alias)
56        }
57    }
58}
59
60impl ProjectionExpr {
61    /// Create a new projection expression
62    pub fn new(expr: Arc<dyn PhysicalExpr>, alias: String) -> Self {
63        Self { expr, alias }
64    }
65
66    /// Create a new projection expression from an expression and a schema using the expression's output field name as alias.
67    pub fn new_from_expression(
68        expr: Arc<dyn PhysicalExpr>,
69        schema: &Schema,
70    ) -> Result<Self> {
71        let field = expr.return_field(schema)?;
72        Ok(Self {
73            expr,
74            alias: field.name().to_string(),
75        })
76    }
77}
78
79impl From<(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
80    fn from(value: (Arc<dyn PhysicalExpr>, String)) -> Self {
81        Self::new(value.0, value.1)
82    }
83}
84
85impl From<&(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
86    fn from(value: &(Arc<dyn PhysicalExpr>, String)) -> Self {
87        Self::new(Arc::clone(&value.0), value.1.clone())
88    }
89}
90
91impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
92    fn from(value: ProjectionExpr) -> Self {
93        (value.expr, value.alias)
94    }
95}
96
97/// A collection of projection expressions.
98///
99/// This struct encapsulates multiple `ProjectionExpr` instances,
100/// representing a complete projection operation and provides
101/// methods to manipulate and analyze the projection as a whole.
102#[derive(Debug, Clone)]
103pub struct ProjectionExprs {
104    exprs: Vec<ProjectionExpr>,
105}
106
107impl std::fmt::Display for ProjectionExprs {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        let exprs: Vec<String> = self.exprs.iter().map(|e| e.to_string()).collect();
110        write!(f, "Projection[{}]", exprs.join(", "))
111    }
112}
113
114impl From<Vec<ProjectionExpr>> for ProjectionExprs {
115    fn from(value: Vec<ProjectionExpr>) -> Self {
116        Self { exprs: value }
117    }
118}
119
120impl From<&[ProjectionExpr]> for ProjectionExprs {
121    fn from(value: &[ProjectionExpr]) -> Self {
122        Self {
123            exprs: value.to_vec(),
124        }
125    }
126}
127
128impl FromIterator<ProjectionExpr> for ProjectionExprs {
129    fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
130        Self {
131            exprs: exprs.into_iter().collect::<Vec<_>>(),
132        }
133    }
134}
135
136impl AsRef<[ProjectionExpr]> for ProjectionExprs {
137    fn as_ref(&self) -> &[ProjectionExpr] {
138        &self.exprs
139    }
140}
141
142impl ProjectionExprs {
143    pub fn new<I>(exprs: I) -> Self
144    where
145        I: IntoIterator<Item = ProjectionExpr>,
146    {
147        Self {
148            exprs: exprs.into_iter().collect::<Vec<_>>(),
149        }
150    }
151
152    /// Creates a [`ProjectionExpr`] from a list of column indices.
153    ///
154    /// This is a convenience method for creating simple column-only projections, where each projection expression is a reference to a column
155    /// in the input schema.
156    ///
157    /// # Behavior
158    /// - Ordering: the output projection preserves the exact order of indices provided in the input slice
159    ///   For example, `[2, 0, 1]` will produce projections for columns 2, 0, then 1 in that order
160    /// - Duplicates: Duplicate indices are allowed and will create multiple projection expressions referencing the same source column
161    ///   For example, `[0, 0]` creates 2 separate projections both referencing column 0
162    ///
163    /// # Panics
164    /// Panics if any index in `indices` is out of bounds for the provided schema.
165    ///
166    /// # Example
167    ///
168    /// ```rust
169    /// use arrow::datatypes::{DataType, Field, Schema};
170    /// use datafusion_physical_expr::projection::ProjectionExprs;
171    /// use std::sync::Arc;
172    ///
173    /// // Create a schema with three columns
174    /// let schema = Arc::new(Schema::new(vec![
175    ///     Field::new("a", DataType::Int32, false),
176    ///     Field::new("b", DataType::Utf8, false),
177    ///     Field::new("c", DataType::Float64, false),
178    /// ]));
179    ///
180    /// // Project columns at indices 2 and 0 (c and a) - ordering is preserved
181    /// let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
182    ///
183    /// // This creates: SELECT c@2 AS c, a@0 AS a
184    /// assert_eq!(projection.as_ref().len(), 2);
185    /// assert_eq!(projection.as_ref()[0].alias, "c");
186    /// assert_eq!(projection.as_ref()[1].alias, "a");
187    ///
188    /// // Duplicate indices are allowed
189    /// let projection_with_dups = ProjectionExprs::from_indices(&[0, 0, 1], &schema);
190    /// assert_eq!(projection_with_dups.as_ref().len(), 3);
191    /// assert_eq!(projection_with_dups.as_ref()[0].alias, "a");
192    /// assert_eq!(projection_with_dups.as_ref()[1].alias, "a"); // duplicate
193    /// assert_eq!(projection_with_dups.as_ref()[2].alias, "b");
194    /// ```
195    pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self {
196        let projection_exprs = indices.iter().map(|&i| {
197            let field = schema.field(i);
198            ProjectionExpr {
199                expr: Arc::new(Column::new(field.name(), i)),
200                alias: field.name().clone(),
201            }
202        });
203
204        Self::from_iter(projection_exprs)
205    }
206
207    /// Returns an iterator over the projection expressions
208    pub fn iter(&self) -> impl Iterator<Item = &ProjectionExpr> {
209        self.exprs.iter()
210    }
211
212    /// Creates a ProjectionMapping from this projection
213    pub fn projection_mapping(
214        &self,
215        input_schema: &SchemaRef,
216    ) -> Result<ProjectionMapping> {
217        ProjectionMapping::try_new(
218            self.exprs
219                .iter()
220                .map(|p| (Arc::clone(&p.expr), p.alias.clone())),
221            input_schema,
222        )
223    }
224
225    /// Iterate over a clone of the projection expressions.
226    pub fn expr_iter(&self) -> impl Iterator<Item = Arc<dyn PhysicalExpr>> + '_ {
227        self.exprs.iter().map(|e| Arc::clone(&e.expr))
228    }
229
230    /// Apply another projection on top of this projection, returning the combined projection.
231    /// For example, if this projection is `SELECT c@2 AS x, b@1 AS y, a@0 as z` and the other projection is `SELECT x@0 + 1 AS c1, y@1 + z@2 as c2`,
232    /// we return a projection equivalent to `SELECT c@2 + 1 AS c1, b@1 + a@0 as c2`.
233    ///
234    /// # Example
235    ///
236    /// ```rust
237    /// use datafusion_common::{Result, ScalarValue};
238    /// use datafusion_expr::Operator;
239    /// use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
240    /// use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
241    /// use std::sync::Arc;
242    ///
243    /// fn main() -> Result<()> {
244    ///     // Example from the docstring:
245    ///     // Base projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
246    ///     let base = ProjectionExprs::new(vec![
247    ///         ProjectionExpr {
248    ///             expr: Arc::new(Column::new("c", 2)),
249    ///             alias: "x".to_string(),
250    ///         },
251    ///         ProjectionExpr {
252    ///             expr: Arc::new(Column::new("b", 1)),
253    ///             alias: "y".to_string(),
254    ///         },
255    ///         ProjectionExpr {
256    ///             expr: Arc::new(Column::new("a", 0)),
257    ///             alias: "z".to_string(),
258    ///         },
259    ///     ]);
260    ///
261    ///     // Top projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
262    ///     let top = ProjectionExprs::new(vec![
263    ///         ProjectionExpr {
264    ///             expr: Arc::new(BinaryExpr::new(
265    ///                 Arc::new(Column::new("x", 0)),
266    ///                 Operator::Plus,
267    ///                 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
268    ///             )),
269    ///             alias: "c1".to_string(),
270    ///         },
271    ///         ProjectionExpr {
272    ///             expr: Arc::new(BinaryExpr::new(
273    ///                 Arc::new(Column::new("y", 1)),
274    ///                 Operator::Plus,
275    ///                 Arc::new(Column::new("z", 2)),
276    ///             )),
277    ///             alias: "c2".to_string(),
278    ///         },
279    ///     ]);
280    ///
281    ///     // Expected result: SELECT c@2 + 1 AS c1, b@1 + a@0 AS c2
282    ///     let result = base.try_merge(&top)?;
283    ///
284    ///     assert_eq!(result.as_ref().len(), 2);
285    ///     assert_eq!(result.as_ref()[0].alias, "c1");
286    ///     assert_eq!(result.as_ref()[1].alias, "c2");
287    ///
288    ///     Ok(())
289    /// }
290    /// ```
291    ///
292    /// # Errors
293    /// This function returns an error if any expression in the `other` projection cannot be
294    /// applied on top of this projection.
295    pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
296        let mut new_exprs = Vec::with_capacity(other.exprs.len());
297        for proj_expr in &other.exprs {
298            let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
299                .ok_or_else(|| {
300                    internal_datafusion_err!(
301                        "Failed to combine projections: expression {} could not be applied on top of existing projections {}",
302                        proj_expr.expr,
303                        self.exprs.iter().map(|e| format!("{e}")).join(", ")
304                    )
305                })?;
306            new_exprs.push(ProjectionExpr {
307                expr: new_expr,
308                alias: proj_expr.alias.clone(),
309            });
310        }
311        Ok(ProjectionExprs::new(new_exprs))
312    }
313
314    /// Extract the column indices used in this projection.
315    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
316    /// this function would return `[0, 1]`.
317    /// Repeated indices are returned only once, and the order is ascending.
318    pub fn column_indices(&self) -> Vec<usize> {
319        self.exprs
320            .iter()
321            .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index()))
322            .sorted_unstable()
323            .dedup()
324            .collect_vec()
325    }
326
327    /// Extract the ordered column indices for a column-only projection.
328    ///
329    /// This function assumes that all expressions in the projection are simple column references.
330    /// It returns the column indices in the order they appear in the projection.
331    ///
332    /// # Panics
333    ///
334    /// Panics if any expression in the projection is not a simple column reference. This includes:
335    /// - Computed expressions (e.g., `a + 1`, `CAST(a AS INT)`)
336    /// - Function calls (e.g., `UPPER(name)`, `SUM(amount)`)
337    /// - Literals (e.g., `42`, `'hello'`)
338    /// - Complex nested expressions (e.g., `CASE WHEN ... THEN ... END`)
339    ///
340    /// # Returns
341    ///
342    /// A vector of column indices in projection order. Unlike [`column_indices()`](Self::column_indices),
343    /// this function:
344    /// - Preserves the projection order (does not sort)
345    /// - Preserves duplicates (does not deduplicate)
346    ///
347    /// # Example
348    ///
349    /// For a projection `SELECT c, a, c` where `a` is at index 0 and `c` is at index 2,
350    /// this function would return `[2, 0, 2]`.
351    ///
352    /// Use [`column_indices()`](Self::column_indices) instead if the projection may contain
353    /// non-column expressions or if you need a deduplicated sorted list.
354    pub fn ordered_column_indices(&self) -> Vec<usize> {
355        self.exprs
356            .iter()
357            .map(|e| {
358                e.expr
359                    .as_any()
360                    .downcast_ref::<Column>()
361                    .expect("Expected column reference in projection")
362                    .index()
363            })
364            .collect()
365    }
366
367    /// Project a schema according to this projection.
368    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
369    /// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output schema would be `[x: Int32, y: Int32]`.
370    /// Fields' metadata are preserved from the input schema.
371    pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> {
372        let fields: Result<Vec<Field>> = self
373            .exprs
374            .iter()
375            .map(|proj_expr| {
376                let metadata = proj_expr
377                    .expr
378                    .return_field(input_schema)?
379                    .metadata()
380                    .clone();
381
382                let field = Field::new(
383                    &proj_expr.alias,
384                    proj_expr.expr.data_type(input_schema)?,
385                    proj_expr.expr.nullable(input_schema)?,
386                )
387                .with_metadata(metadata);
388
389                Ok(field)
390            })
391            .collect();
392
393        Ok(Schema::new_with_metadata(
394            fields?,
395            input_schema.metadata().clone(),
396        ))
397    }
398
399    /// Project statistics according to this projection.
400    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
401    /// if the input statistics has column statistics for columns `a`, `b`, and `c`, the output statistics would have column statistics for columns `x` and `y`.
402    pub fn project_statistics(
403        &self,
404        mut stats: datafusion_common::Statistics,
405        input_schema: &Schema,
406    ) -> Result<datafusion_common::Statistics> {
407        let mut primitive_row_size = 0;
408        let mut primitive_row_size_possible = true;
409        let mut column_statistics = vec![];
410
411        for proj_expr in &self.exprs {
412            let expr = &proj_expr.expr;
413            let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
414                stats.column_statistics[col.index()].clone()
415            } else {
416                // TODO stats: estimate more statistics from expressions
417                // (expressions should compute their statistics themselves)
418                ColumnStatistics::new_unknown()
419            };
420            column_statistics.push(col_stats);
421            let data_type = expr.data_type(input_schema)?;
422            if let Some(value) = data_type.primitive_width() {
423                primitive_row_size += value;
424                continue;
425            }
426            primitive_row_size_possible = false;
427        }
428
429        if primitive_row_size_possible {
430            stats.total_byte_size =
431                Precision::Exact(primitive_row_size).multiply(&stats.num_rows);
432        }
433        stats.column_statistics = column_statistics;
434        Ok(stats)
435    }
436}
437
438impl<'a> IntoIterator for &'a ProjectionExprs {
439    type Item = &'a ProjectionExpr;
440    type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
441
442    fn into_iter(self) -> Self::IntoIter {
443        self.exprs.iter()
444    }
445}
446
447impl IntoIterator for ProjectionExprs {
448    type Item = ProjectionExpr;
449    type IntoIter = std::vec::IntoIter<ProjectionExpr>;
450
451    fn into_iter(self) -> Self::IntoIter {
452        self.exprs.into_iter()
453    }
454}
455
456/// The function operates in two modes:
457///
458/// 1) When `sync_with_child` is `true`:
459///
460///    The function updates the indices of `expr` if the expression resides
461///    in the input plan. For instance, given the expressions `a@1 + b@2`
462///    and `c@0` with the input schema `c@2, a@0, b@1`, the expressions are
463///    updated to `a@0 + b@1` and `c@2`.
464///
465/// 2) When `sync_with_child` is `false`:
466///
467///    The function determines how the expression would be updated if a projection
468///    was placed before the plan associated with the expression. If the expression
469///    cannot be rewritten after the projection, it returns `None`. For example,
470///    given the expressions `c@0`, `a@1` and `b@2`, and the projection with
471///    an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes
472///    `a@0`, but `b@2` results in `None` since the projection does not include `b`.
473///
474/// # Errors
475/// This function returns an error if `sync_with_child` is `true` and if any expression references
476/// an index that is out of bounds for `projected_exprs`.
477/// For example:
478///
479/// - `expr` is `a@3`
480/// - `projected_exprs` is \[`a@0`, `b@1`\]
481///
482/// In this case, `a@3` references index 3, which is out of bounds for `projected_exprs` (which has length 2).
483pub fn update_expr(
484    expr: &Arc<dyn PhysicalExpr>,
485    projected_exprs: &[ProjectionExpr],
486    sync_with_child: bool,
487) -> Result<Option<Arc<dyn PhysicalExpr>>> {
488    #[derive(Debug, PartialEq)]
489    enum RewriteState {
490        /// The expression is unchanged.
491        Unchanged,
492        /// Some part of the expression has been rewritten
493        RewrittenValid,
494        /// Some part of the expression has been rewritten, but some column
495        /// references could not be.
496        RewrittenInvalid,
497    }
498
499    let mut state = RewriteState::Unchanged;
500
501    let new_expr = Arc::clone(expr)
502        .transform_up(|expr| {
503            if state == RewriteState::RewrittenInvalid {
504                return Ok(Transformed::no(expr));
505            }
506
507            let Some(column) = expr.as_any().downcast_ref::<Column>() else {
508                return Ok(Transformed::no(expr));
509            };
510            if sync_with_child {
511                state = RewriteState::RewrittenValid;
512                // Update the index of `column`:
513                let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {
514                    internal_datafusion_err!(
515                        "Column index {} out of bounds for projected expressions of length {}",
516                        column.index(),
517                        projected_exprs.len()
518                    )
519                })?;
520                Ok(Transformed::yes(Arc::clone(&projected_expr.expr)))
521            } else {
522                // default to invalid, in case we can't find the relevant column
523                state = RewriteState::RewrittenInvalid;
524                // Determine how to update `column` to accommodate `projected_exprs`
525                projected_exprs
526                    .iter()
527                    .enumerate()
528                    .find_map(|(index, proj_expr)| {
529                        proj_expr.expr.as_any().downcast_ref::<Column>().and_then(
530                            |projected_column| {
531                                (column.name().eq(projected_column.name())
532                                    && column.index() == projected_column.index())
533                                .then(|| {
534                                    state = RewriteState::RewrittenValid;
535                                    Arc::new(Column::new(&proj_expr.alias, index)) as _
536                                })
537                            },
538                        )
539                    })
540                    .map_or_else(
541                        || Ok(Transformed::no(expr)),
542                        |c| Ok(Transformed::yes(c)),
543                    )
544            }
545        })
546        .data()?;
547
548    Ok((state == RewriteState::RewrittenValid).then_some(new_expr))
549}
550
551/// Stores target expressions, along with their indices, that associate with a
552/// source expression in a projection mapping.
553#[derive(Clone, Debug, Default)]
554pub struct ProjectionTargets {
555    /// A non-empty vector of pairs of target expressions and their indices.
556    /// Consider using a special non-empty collection type in the future (e.g.
557    /// if Rust provides one in the standard library).
558    exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
559}
560
561impl ProjectionTargets {
562    /// Returns the first target expression and its index.
563    pub fn first(&self) -> &(Arc<dyn PhysicalExpr>, usize) {
564        // Since the vector is non-empty, we can safely unwrap:
565        self.exprs_indices.first().unwrap()
566    }
567
568    /// Adds a target expression and its index to the list of targets.
569    pub fn push(&mut self, target: (Arc<dyn PhysicalExpr>, usize)) {
570        self.exprs_indices.push(target);
571    }
572}
573
574impl Deref for ProjectionTargets {
575    type Target = [(Arc<dyn PhysicalExpr>, usize)];
576
577    fn deref(&self) -> &Self::Target {
578        &self.exprs_indices
579    }
580}
581
582impl From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets {
583    fn from(exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>) -> Self {
584        Self { exprs_indices }
585    }
586}
587
588/// Stores the mapping between source expressions and target expressions for a
589/// projection.
590#[derive(Clone, Debug)]
591pub struct ProjectionMapping {
592    /// Mapping between source expressions and target expressions.
593    /// Vector indices correspond to the indices after projection.
594    map: IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>,
595}
596
597impl ProjectionMapping {
598    /// Constructs the mapping between a projection's input and output
599    /// expressions.
600    ///
601    /// For example, given the input projection expressions (`a + b`, `c + d`)
602    /// and an output schema with two columns `"c + d"` and `"a + b"`, the
603    /// projection mapping would be:
604    ///
605    /// ```text
606    ///  [0]: (c + d, [(col("c + d"), 0)])
607    ///  [1]: (a + b, [(col("a + b"), 1)])
608    /// ```
609    ///
610    /// where `col("c + d")` means the column named `"c + d"`.
611    pub fn try_new(
612        expr: impl IntoIterator<Item = (Arc<dyn PhysicalExpr>, String)>,
613        input_schema: &SchemaRef,
614    ) -> Result<Self> {
615        // Construct a map from the input expressions to the output expression of the projection:
616        let mut map = IndexMap::<_, ProjectionTargets>::new();
617        for (expr_idx, (expr, name)) in expr.into_iter().enumerate() {
618            let target_expr = Arc::new(Column::new(&name, expr_idx)) as _;
619            let source_expr = expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
620                Some(col) => {
621                    // Sometimes, an expression and its name in the input_schema
622                    // doesn't match. This can cause problems, so we make sure
623                    // that the expression name matches with the name in `input_schema`.
624                    // Conceptually, `source_expr` and `expression` should be the same.
625                    let idx = col.index();
626                    let matching_field = input_schema.field(idx);
627                    let matching_name = matching_field.name();
628                    if col.name() != matching_name {
629                        return internal_err!(
630                            "Input field name {} does not match with the projection expression {}",
631                            matching_name,
632                            col.name()
633                        );
634                    }
635                    let matching_column = Column::new(matching_name, idx);
636                    Ok(Transformed::yes(Arc::new(matching_column)))
637                }
638                None => Ok(Transformed::no(e)),
639            })
640            .data()?;
641            map.entry(source_expr)
642                .or_default()
643                .push((target_expr, expr_idx));
644        }
645        Ok(Self { map })
646    }
647
648    /// Constructs a subset mapping using the provided indices.
649    ///
650    /// This is used when the output is a subset of the input without any
651    /// other transformations. The indices are for columns in the schema.
652    pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Result<Self> {
653        let projection_exprs = indices.iter().map(|index| {
654            let field = schema.field(*index);
655            let column = Arc::new(Column::new(field.name(), *index));
656            (column as _, field.name().clone())
657        });
658        ProjectionMapping::try_new(projection_exprs, schema)
659    }
660}
661
662impl Deref for ProjectionMapping {
663    type Target = IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>;
664
665    fn deref(&self) -> &Self::Target {
666        &self.map
667    }
668}
669
670impl FromIterator<(Arc<dyn PhysicalExpr>, ProjectionTargets)> for ProjectionMapping {
671    fn from_iter<T: IntoIterator<Item = (Arc<dyn PhysicalExpr>, ProjectionTargets)>>(
672        iter: T,
673    ) -> Self {
674        Self {
675            map: IndexMap::from_iter(iter),
676        }
677    }
678}
679
680/// Projects a slice of [LexOrdering]s onto the given schema.
681///
682/// This is a convenience wrapper that applies [project_ordering] to each
683/// input ordering and collects the successful projections:
684/// - For each input ordering, the result of [project_ordering] is appended to
685///   the output if it is `Some(...)`.
686/// - Order is preserved and no deduplication is attempted.
687/// - If none of the input orderings can be projected, an empty `Vec` is
688///   returned.
689///
690/// See [project_ordering] for the semantics of projecting a single
691/// [LexOrdering].
692pub fn project_orderings(
693    orderings: &[LexOrdering],
694    schema: &SchemaRef,
695) -> Vec<LexOrdering> {
696    let mut projected_orderings = vec![];
697
698    for ordering in orderings {
699        projected_orderings.extend(project_ordering(ordering, schema));
700    }
701
702    projected_orderings
703}
704
705/// Projects a single [LexOrdering] onto the given schema.
706///
707/// This function attempts to rewrite every [PhysicalSortExpr] in the provided
708/// [LexOrdering] so that any [Column] expressions point at the correct field
709/// indices in `schema`.
710///
711/// Key details:
712/// - Columns are matched by name, not by index. The index of each matched
713///   column is looked up with [Schema::column_with_name](arrow::datatypes::Schema::column_with_name) and a new
714///   [Column] with the correct [index](Column::index) is substituted.
715/// - If an expression references a column name that does not exist in
716///   `schema`, projection of the current ordering stops and only the already
717///   rewritten prefix is kept. This models the fact that a lexicographical
718///   ordering remains valid for any leading prefix whose expressions are
719///   present in the projected schema.
720/// - If no expressions can be projected (i.e. the first one is missing), the
721///   function returns `None`.
722///
723/// Return value:
724/// - `Some(LexOrdering)` if at least one sort expression could be projected.
725///   The returned ordering may be a strict prefix of the input ordering.
726/// - `None` if no part of the ordering can be projected onto `schema`.
727///
728/// Example
729///
730/// Suppose we have an input ordering `[col("a@0"), col("b@1")]` but the projected
731/// schema only contains b and not a. The result will be `Some([col("a@0")])`. In other
732/// words, the column reference is reindexed to match the projected schema.
733/// If neither a nor b is present, the result will be None.
734pub fn project_ordering(
735    ordering: &LexOrdering,
736    schema: &SchemaRef,
737) -> Option<LexOrdering> {
738    let mut projected_exprs = vec![];
739    for PhysicalSortExpr { expr, options } in ordering.iter() {
740        let transformed = Arc::clone(expr).transform_up(|expr| {
741            let Some(col) = expr.as_any().downcast_ref::<Column>() else {
742                return Ok(Transformed::no(expr));
743            };
744
745            let name = col.name();
746            if let Some((idx, _)) = schema.column_with_name(name) {
747                // Compute the new column expression (with correct index) after projection:
748                Ok(Transformed::yes(Arc::new(Column::new(name, idx))))
749            } else {
750                // Cannot find expression in the projected_schema,
751                // signal this using an Err result
752                plan_err!("")
753            }
754        });
755
756        match transformed {
757            Ok(transformed) => {
758                projected_exprs.push(PhysicalSortExpr::new(transformed.data, *options));
759            }
760            Err(_) => {
761                // Err result indicates an expression could not be found in the
762                // projected_schema, stop iterating since rest of the orderings are violated
763                break;
764            }
765        }
766    }
767
768    LexOrdering::new(projected_exprs)
769}
770
771#[cfg(test)]
772pub(crate) mod tests {
773    use std::collections::HashMap;
774
775    use super::*;
776    use crate::equivalence::{convert_to_orderings, EquivalenceProperties};
777    use crate::expressions::{col, BinaryExpr, Literal};
778    use crate::utils::tests::TestScalarUDF;
779    use crate::{PhysicalExprRef, ScalarFunctionExpr};
780
781    use arrow::compute::SortOptions;
782    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
783    use datafusion_common::config::ConfigOptions;
784    use datafusion_common::{ScalarValue, Statistics};
785    use datafusion_expr::{Operator, ScalarUDF};
786    use insta::assert_snapshot;
787
788    pub(crate) fn output_schema(
789        mapping: &ProjectionMapping,
790        input_schema: &Arc<Schema>,
791    ) -> Result<SchemaRef> {
792        // Calculate output schema:
793        let mut fields = vec![];
794        for (source, targets) in mapping.iter() {
795            let data_type = source.data_type(input_schema)?;
796            let nullable = source.nullable(input_schema)?;
797            for (target, _) in targets.iter() {
798                let Some(column) = target.as_any().downcast_ref::<Column>() else {
799                    return plan_err!("Expects to have column");
800                };
801                fields.push(Field::new(column.name(), data_type.clone(), nullable));
802            }
803        }
804
805        let output_schema = Arc::new(Schema::new_with_metadata(
806            fields,
807            input_schema.metadata().clone(),
808        ));
809
810        Ok(output_schema)
811    }
812
813    #[test]
814    fn project_orderings() -> Result<()> {
815        let schema = Arc::new(Schema::new(vec![
816            Field::new("a", DataType::Int32, true),
817            Field::new("b", DataType::Int32, true),
818            Field::new("c", DataType::Int32, true),
819            Field::new("d", DataType::Int32, true),
820            Field::new("e", DataType::Int32, true),
821            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
822        ]));
823        let col_a = &col("a", &schema)?;
824        let col_b = &col("b", &schema)?;
825        let col_c = &col("c", &schema)?;
826        let col_d = &col("d", &schema)?;
827        let col_e = &col("e", &schema)?;
828        let col_ts = &col("ts", &schema)?;
829        let a_plus_b = Arc::new(BinaryExpr::new(
830            Arc::clone(col_a),
831            Operator::Plus,
832            Arc::clone(col_b),
833        )) as Arc<dyn PhysicalExpr>;
834        let b_plus_d = Arc::new(BinaryExpr::new(
835            Arc::clone(col_b),
836            Operator::Plus,
837            Arc::clone(col_d),
838        )) as Arc<dyn PhysicalExpr>;
839        let b_plus_e = Arc::new(BinaryExpr::new(
840            Arc::clone(col_b),
841            Operator::Plus,
842            Arc::clone(col_e),
843        )) as Arc<dyn PhysicalExpr>;
844        let c_plus_d = Arc::new(BinaryExpr::new(
845            Arc::clone(col_c),
846            Operator::Plus,
847            Arc::clone(col_d),
848        )) as Arc<dyn PhysicalExpr>;
849
850        let option_asc = SortOptions {
851            descending: false,
852            nulls_first: false,
853        };
854        let option_desc = SortOptions {
855            descending: true,
856            nulls_first: true,
857        };
858
859        let test_cases = vec![
860            // ---------- TEST CASE 1 ------------
861            (
862                // orderings
863                vec![
864                    // [b ASC]
865                    vec![(col_b, option_asc)],
866                ],
867                // projection exprs
868                vec![(col_b, "b_new".to_string()), (col_a, "a_new".to_string())],
869                // expected
870                vec![
871                    // [b_new ASC]
872                    vec![("b_new", option_asc)],
873                ],
874            ),
875            // ---------- TEST CASE 2 ------------
876            (
877                // orderings
878                vec![
879                    // empty ordering
880                ],
881                // projection exprs
882                vec![(col_c, "c_new".to_string()), (col_b, "b_new".to_string())],
883                // expected
884                vec![
885                    // no ordering at the output
886                ],
887            ),
888            // ---------- TEST CASE 3 ------------
889            (
890                // orderings
891                vec![
892                    // [ts ASC]
893                    vec![(col_ts, option_asc)],
894                ],
895                // projection exprs
896                vec![
897                    (col_b, "b_new".to_string()),
898                    (col_a, "a_new".to_string()),
899                    (col_ts, "ts_new".to_string()),
900                ],
901                // expected
902                vec![
903                    // [ts_new ASC]
904                    vec![("ts_new", option_asc)],
905                ],
906            ),
907            // ---------- TEST CASE 4 ------------
908            (
909                // orderings
910                vec![
911                    // [a ASC, ts ASC]
912                    vec![(col_a, option_asc), (col_ts, option_asc)],
913                    // [b ASC, ts ASC]
914                    vec![(col_b, option_asc), (col_ts, option_asc)],
915                ],
916                // projection exprs
917                vec![
918                    (col_b, "b_new".to_string()),
919                    (col_a, "a_new".to_string()),
920                    (col_ts, "ts_new".to_string()),
921                ],
922                // expected
923                vec![
924                    // [a_new ASC, ts_new ASC]
925                    vec![("a_new", option_asc), ("ts_new", option_asc)],
926                    // [b_new ASC, ts_new ASC]
927                    vec![("b_new", option_asc), ("ts_new", option_asc)],
928                ],
929            ),
930            // ---------- TEST CASE 5 ------------
931            (
932                // orderings
933                vec![
934                    // [a + b ASC]
935                    vec![(&a_plus_b, option_asc)],
936                ],
937                // projection exprs
938                vec![
939                    (col_b, "b_new".to_string()),
940                    (col_a, "a_new".to_string()),
941                    (&a_plus_b, "a+b".to_string()),
942                ],
943                // expected
944                vec![
945                    // [a + b ASC]
946                    vec![("a+b", option_asc)],
947                ],
948            ),
949            // ---------- TEST CASE 6 ------------
950            (
951                // orderings
952                vec![
953                    // [a + b ASC, c ASC]
954                    vec![(&a_plus_b, option_asc), (col_c, option_asc)],
955                ],
956                // projection exprs
957                vec![
958                    (col_b, "b_new".to_string()),
959                    (col_a, "a_new".to_string()),
960                    (col_c, "c_new".to_string()),
961                    (&a_plus_b, "a+b".to_string()),
962                ],
963                // expected
964                vec![
965                    // [a + b ASC, c_new ASC]
966                    vec![("a+b", option_asc), ("c_new", option_asc)],
967                ],
968            ),
969            // ------- TEST CASE 7 ----------
970            (
971                vec![
972                    // [a ASC, b ASC, c ASC]
973                    vec![(col_a, option_asc), (col_b, option_asc)],
974                    // [a ASC, d ASC]
975                    vec![(col_a, option_asc), (col_d, option_asc)],
976                ],
977                // b as b_new, a as a_new, d as d_new b+d
978                vec![
979                    (col_b, "b_new".to_string()),
980                    (col_a, "a_new".to_string()),
981                    (col_d, "d_new".to_string()),
982                    (&b_plus_d, "b+d".to_string()),
983                ],
984                // expected
985                vec![
986                    // [a_new ASC, b_new ASC]
987                    vec![("a_new", option_asc), ("b_new", option_asc)],
988                    // [a_new ASC, d_new ASC]
989                    vec![("a_new", option_asc), ("d_new", option_asc)],
990                    // [a_new ASC, b+d ASC]
991                    vec![("a_new", option_asc), ("b+d", option_asc)],
992                ],
993            ),
994            // ------- TEST CASE 8 ----------
995            (
996                // orderings
997                vec![
998                    // [b+d ASC]
999                    vec![(&b_plus_d, option_asc)],
1000                ],
1001                // proj exprs
1002                vec![
1003                    (col_b, "b_new".to_string()),
1004                    (col_a, "a_new".to_string()),
1005                    (col_d, "d_new".to_string()),
1006                    (&b_plus_d, "b+d".to_string()),
1007                ],
1008                // expected
1009                vec![
1010                    // [b+d ASC]
1011                    vec![("b+d", option_asc)],
1012                ],
1013            ),
1014            // ------- TEST CASE 9 ----------
1015            (
1016                // orderings
1017                vec![
1018                    // [a ASC, d ASC, b ASC]
1019                    vec![
1020                        (col_a, option_asc),
1021                        (col_d, option_asc),
1022                        (col_b, option_asc),
1023                    ],
1024                    // [c ASC]
1025                    vec![(col_c, option_asc)],
1026                ],
1027                // proj exprs
1028                vec![
1029                    (col_b, "b_new".to_string()),
1030                    (col_a, "a_new".to_string()),
1031                    (col_d, "d_new".to_string()),
1032                    (col_c, "c_new".to_string()),
1033                ],
1034                // expected
1035                vec![
1036                    // [a_new ASC, d_new ASC, b_new ASC]
1037                    vec![
1038                        ("a_new", option_asc),
1039                        ("d_new", option_asc),
1040                        ("b_new", option_asc),
1041                    ],
1042                    // [c_new ASC],
1043                    vec![("c_new", option_asc)],
1044                ],
1045            ),
1046            // ------- TEST CASE 10 ----------
1047            (
1048                vec![
1049                    // [a ASC, b ASC, c ASC]
1050                    vec![
1051                        (col_a, option_asc),
1052                        (col_b, option_asc),
1053                        (col_c, option_asc),
1054                    ],
1055                    // [a ASC, d ASC]
1056                    vec![(col_a, option_asc), (col_d, option_asc)],
1057                ],
1058                // proj exprs
1059                vec![
1060                    (col_b, "b_new".to_string()),
1061                    (col_a, "a_new".to_string()),
1062                    (col_c, "c_new".to_string()),
1063                    (&c_plus_d, "c+d".to_string()),
1064                ],
1065                // expected
1066                vec![
1067                    // [a_new ASC, b_new ASC, c_new ASC]
1068                    vec![
1069                        ("a_new", option_asc),
1070                        ("b_new", option_asc),
1071                        ("c_new", option_asc),
1072                    ],
1073                    // [a_new ASC, b_new ASC, c+d ASC]
1074                    vec![
1075                        ("a_new", option_asc),
1076                        ("b_new", option_asc),
1077                        ("c+d", option_asc),
1078                    ],
1079                ],
1080            ),
1081            // ------- TEST CASE 11 ----------
1082            (
1083                // orderings
1084                vec![
1085                    // [a ASC, b ASC]
1086                    vec![(col_a, option_asc), (col_b, option_asc)],
1087                    // [a ASC, d ASC]
1088                    vec![(col_a, option_asc), (col_d, option_asc)],
1089                ],
1090                // proj exprs
1091                vec![
1092                    (col_b, "b_new".to_string()),
1093                    (col_a, "a_new".to_string()),
1094                    (&b_plus_d, "b+d".to_string()),
1095                ],
1096                // expected
1097                vec![
1098                    // [a_new ASC, b_new ASC]
1099                    vec![("a_new", option_asc), ("b_new", option_asc)],
1100                    // [a_new ASC, b + d ASC]
1101                    vec![("a_new", option_asc), ("b+d", option_asc)],
1102                ],
1103            ),
1104            // ------- TEST CASE 12 ----------
1105            (
1106                // orderings
1107                vec![
1108                    // [a ASC, b ASC, c ASC]
1109                    vec![
1110                        (col_a, option_asc),
1111                        (col_b, option_asc),
1112                        (col_c, option_asc),
1113                    ],
1114                ],
1115                // proj exprs
1116                vec![(col_c, "c_new".to_string()), (col_a, "a_new".to_string())],
1117                // expected
1118                vec![
1119                    // [a_new ASC]
1120                    vec![("a_new", option_asc)],
1121                ],
1122            ),
1123            // ------- TEST CASE 13 ----------
1124            (
1125                // orderings
1126                vec![
1127                    // [a ASC, b ASC, c ASC]
1128                    vec![
1129                        (col_a, option_asc),
1130                        (col_b, option_asc),
1131                        (col_c, option_asc),
1132                    ],
1133                    // [a ASC, a + b ASC, c ASC]
1134                    vec![
1135                        (col_a, option_asc),
1136                        (&a_plus_b, option_asc),
1137                        (col_c, option_asc),
1138                    ],
1139                ],
1140                // proj exprs
1141                vec![
1142                    (col_c, "c_new".to_string()),
1143                    (col_b, "b_new".to_string()),
1144                    (col_a, "a_new".to_string()),
1145                    (&a_plus_b, "a+b".to_string()),
1146                ],
1147                // expected
1148                vec![
1149                    // [a_new ASC, b_new ASC, c_new ASC]
1150                    vec![
1151                        ("a_new", option_asc),
1152                        ("b_new", option_asc),
1153                        ("c_new", option_asc),
1154                    ],
1155                    // [a_new ASC, a+b ASC, c_new ASC]
1156                    vec![
1157                        ("a_new", option_asc),
1158                        ("a+b", option_asc),
1159                        ("c_new", option_asc),
1160                    ],
1161                ],
1162            ),
1163            // ------- TEST CASE 14 ----------
1164            (
1165                // orderings
1166                vec![
1167                    // [a ASC, b ASC]
1168                    vec![(col_a, option_asc), (col_b, option_asc)],
1169                    // [c ASC, b ASC]
1170                    vec![(col_c, option_asc), (col_b, option_asc)],
1171                    // [d ASC, e ASC]
1172                    vec![(col_d, option_asc), (col_e, option_asc)],
1173                ],
1174                // proj exprs
1175                vec![
1176                    (col_c, "c_new".to_string()),
1177                    (col_d, "d_new".to_string()),
1178                    (col_a, "a_new".to_string()),
1179                    (&b_plus_e, "b+e".to_string()),
1180                ],
1181                // expected
1182                vec![
1183                    // [a_new ASC, d_new ASC, b+e ASC]
1184                    vec![
1185                        ("a_new", option_asc),
1186                        ("d_new", option_asc),
1187                        ("b+e", option_asc),
1188                    ],
1189                    // [d_new ASC, a_new ASC, b+e ASC]
1190                    vec![
1191                        ("d_new", option_asc),
1192                        ("a_new", option_asc),
1193                        ("b+e", option_asc),
1194                    ],
1195                    // [c_new ASC, d_new ASC, b+e ASC]
1196                    vec![
1197                        ("c_new", option_asc),
1198                        ("d_new", option_asc),
1199                        ("b+e", option_asc),
1200                    ],
1201                    // [d_new ASC, c_new ASC, b+e ASC]
1202                    vec![
1203                        ("d_new", option_asc),
1204                        ("c_new", option_asc),
1205                        ("b+e", option_asc),
1206                    ],
1207                ],
1208            ),
1209            // ------- TEST CASE 15 ----------
1210            (
1211                // orderings
1212                vec![
1213                    // [a ASC, c ASC, b ASC]
1214                    vec![
1215                        (col_a, option_asc),
1216                        (col_c, option_asc),
1217                        (col_b, option_asc),
1218                    ],
1219                ],
1220                // proj exprs
1221                vec![
1222                    (col_c, "c_new".to_string()),
1223                    (col_a, "a_new".to_string()),
1224                    (&a_plus_b, "a+b".to_string()),
1225                ],
1226                // expected
1227                vec![
1228                    // [a_new ASC, d_new ASC, b+e ASC]
1229                    vec![
1230                        ("a_new", option_asc),
1231                        ("c_new", option_asc),
1232                        ("a+b", option_asc),
1233                    ],
1234                ],
1235            ),
1236            // ------- TEST CASE 16 ----------
1237            (
1238                // orderings
1239                vec![
1240                    // [a ASC, b ASC]
1241                    vec![(col_a, option_asc), (col_b, option_asc)],
1242                    // [c ASC, b DESC]
1243                    vec![(col_c, option_asc), (col_b, option_desc)],
1244                    // [e ASC]
1245                    vec![(col_e, option_asc)],
1246                ],
1247                // proj exprs
1248                vec![
1249                    (col_c, "c_new".to_string()),
1250                    (col_a, "a_new".to_string()),
1251                    (col_b, "b_new".to_string()),
1252                    (&b_plus_e, "b+e".to_string()),
1253                ],
1254                // expected
1255                vec![
1256                    // [a_new ASC, b_new ASC]
1257                    vec![("a_new", option_asc), ("b_new", option_asc)],
1258                    // [a_new ASC, b_new ASC]
1259                    vec![("a_new", option_asc), ("b+e", option_asc)],
1260                    // [c_new ASC, b_new DESC]
1261                    vec![("c_new", option_asc), ("b_new", option_desc)],
1262                ],
1263            ),
1264        ];
1265
1266        for (idx, (orderings, proj_exprs, expected)) in test_cases.into_iter().enumerate()
1267        {
1268            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1269
1270            let orderings = convert_to_orderings(&orderings);
1271            eq_properties.add_orderings(orderings);
1272
1273            let proj_exprs = proj_exprs
1274                .into_iter()
1275                .map(|(expr, name)| (Arc::clone(expr), name));
1276            let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1277            let output_schema = output_schema(&projection_mapping, &schema)?;
1278
1279            let expected = expected
1280                .into_iter()
1281                .map(|ordering| {
1282                    ordering
1283                        .into_iter()
1284                        .map(|(name, options)| {
1285                            (col(name, &output_schema).unwrap(), options)
1286                        })
1287                        .collect::<Vec<_>>()
1288                })
1289                .collect::<Vec<_>>();
1290            let expected = convert_to_orderings(&expected);
1291
1292            let projected_eq = eq_properties.project(&projection_mapping, output_schema);
1293            let orderings = projected_eq.oeq_class();
1294
1295            let err_msg = format!(
1296                "test_idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1297            );
1298
1299            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1300            for expected_ordering in &expected {
1301                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1302            }
1303        }
1304
1305        Ok(())
1306    }
1307
1308    #[test]
1309    fn project_orderings2() -> Result<()> {
1310        let schema = Arc::new(Schema::new(vec![
1311            Field::new("a", DataType::Int32, true),
1312            Field::new("b", DataType::Int32, true),
1313            Field::new("c", DataType::Int32, true),
1314            Field::new("d", DataType::Int32, true),
1315            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1316        ]));
1317        let col_a = &col("a", &schema)?;
1318        let col_b = &col("b", &schema)?;
1319        let col_c = &col("c", &schema)?;
1320        let col_ts = &col("ts", &schema)?;
1321        let a_plus_b = Arc::new(BinaryExpr::new(
1322            Arc::clone(col_a),
1323            Operator::Plus,
1324            Arc::clone(col_b),
1325        )) as Arc<dyn PhysicalExpr>;
1326
1327        let test_fun = Arc::new(ScalarUDF::new_from_impl(TestScalarUDF::new()));
1328
1329        let round_c = Arc::new(ScalarFunctionExpr::try_new(
1330            test_fun,
1331            vec![Arc::clone(col_c)],
1332            &schema,
1333            Arc::new(ConfigOptions::default()),
1334        )?) as PhysicalExprRef;
1335
1336        let option_asc = SortOptions {
1337            descending: false,
1338            nulls_first: false,
1339        };
1340
1341        let proj_exprs = vec![
1342            (col_b, "b_new".to_string()),
1343            (col_a, "a_new".to_string()),
1344            (col_c, "c_new".to_string()),
1345            (&round_c, "round_c_res".to_string()),
1346        ];
1347        let proj_exprs = proj_exprs
1348            .into_iter()
1349            .map(|(expr, name)| (Arc::clone(expr), name));
1350        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1351        let output_schema = output_schema(&projection_mapping, &schema)?;
1352
1353        let col_a_new = &col("a_new", &output_schema)?;
1354        let col_b_new = &col("b_new", &output_schema)?;
1355        let col_c_new = &col("c_new", &output_schema)?;
1356        let col_round_c_res = &col("round_c_res", &output_schema)?;
1357        let a_new_plus_b_new = Arc::new(BinaryExpr::new(
1358            Arc::clone(col_a_new),
1359            Operator::Plus,
1360            Arc::clone(col_b_new),
1361        )) as Arc<dyn PhysicalExpr>;
1362
1363        let test_cases = [
1364            // ---------- TEST CASE 1 ------------
1365            (
1366                // orderings
1367                vec![
1368                    // [a ASC]
1369                    vec![(col_a, option_asc)],
1370                ],
1371                // expected
1372                vec![
1373                    // [b_new ASC]
1374                    vec![(col_a_new, option_asc)],
1375                ],
1376            ),
1377            // ---------- TEST CASE 2 ------------
1378            (
1379                // orderings
1380                vec![
1381                    // [a+b ASC]
1382                    vec![(&a_plus_b, option_asc)],
1383                ],
1384                // expected
1385                vec![
1386                    // [b_new ASC]
1387                    vec![(&a_new_plus_b_new, option_asc)],
1388                ],
1389            ),
1390            // ---------- TEST CASE 3 ------------
1391            (
1392                // orderings
1393                vec![
1394                    // [a ASC, ts ASC]
1395                    vec![(col_a, option_asc), (col_ts, option_asc)],
1396                ],
1397                // expected
1398                vec![
1399                    // [a_new ASC, date_bin_res ASC]
1400                    vec![(col_a_new, option_asc)],
1401                ],
1402            ),
1403            // ---------- TEST CASE 4 ------------
1404            (
1405                // orderings
1406                vec![
1407                    // [a ASC, ts ASC, b ASC]
1408                    vec![
1409                        (col_a, option_asc),
1410                        (col_ts, option_asc),
1411                        (col_b, option_asc),
1412                    ],
1413                ],
1414                // expected
1415                vec![
1416                    // [a_new ASC, date_bin_res ASC]
1417                    vec![(col_a_new, option_asc)],
1418                ],
1419            ),
1420            // ---------- TEST CASE 5 ------------
1421            (
1422                // orderings
1423                vec![
1424                    // [a ASC, c ASC]
1425                    vec![(col_a, option_asc), (col_c, option_asc)],
1426                ],
1427                // expected
1428                vec![
1429                    // [a_new ASC, round_c_res ASC, c_new ASC]
1430                    vec![(col_a_new, option_asc), (col_round_c_res, option_asc)],
1431                    // [a_new ASC, c_new ASC]
1432                    vec![(col_a_new, option_asc), (col_c_new, option_asc)],
1433                ],
1434            ),
1435            // ---------- TEST CASE 6 ------------
1436            (
1437                // orderings
1438                vec![
1439                    // [c ASC, b ASC]
1440                    vec![(col_c, option_asc), (col_b, option_asc)],
1441                ],
1442                // expected
1443                vec![
1444                    // [round_c_res ASC]
1445                    vec![(col_round_c_res, option_asc)],
1446                    // [c_new ASC, b_new ASC]
1447                    vec![(col_c_new, option_asc), (col_b_new, option_asc)],
1448                ],
1449            ),
1450            // ---------- TEST CASE 7 ------------
1451            (
1452                // orderings
1453                vec![
1454                    // [a+b ASC, c ASC]
1455                    vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1456                ],
1457                // expected
1458                vec![
1459                    // [a+b ASC, round(c) ASC, c_new ASC]
1460                    vec![
1461                        (&a_new_plus_b_new, option_asc),
1462                        (col_round_c_res, option_asc),
1463                    ],
1464                    // [a+b ASC, c_new ASC]
1465                    vec![(&a_new_plus_b_new, option_asc), (col_c_new, option_asc)],
1466                ],
1467            ),
1468        ];
1469
1470        for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
1471            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1472
1473            let orderings = convert_to_orderings(orderings);
1474            eq_properties.add_orderings(orderings);
1475
1476            let expected = convert_to_orderings(expected);
1477
1478            let projected_eq =
1479                eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1480            let orderings = projected_eq.oeq_class();
1481
1482            let err_msg = format!(
1483                "test idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1484            );
1485
1486            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1487            for expected_ordering in &expected {
1488                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1489            }
1490        }
1491        Ok(())
1492    }
1493
1494    #[test]
1495    fn project_orderings3() -> Result<()> {
1496        let schema = Arc::new(Schema::new(vec![
1497            Field::new("a", DataType::Int32, true),
1498            Field::new("b", DataType::Int32, true),
1499            Field::new("c", DataType::Int32, true),
1500            Field::new("d", DataType::Int32, true),
1501            Field::new("e", DataType::Int32, true),
1502            Field::new("f", DataType::Int32, true),
1503        ]));
1504        let col_a = &col("a", &schema)?;
1505        let col_b = &col("b", &schema)?;
1506        let col_c = &col("c", &schema)?;
1507        let col_d = &col("d", &schema)?;
1508        let col_e = &col("e", &schema)?;
1509        let col_f = &col("f", &schema)?;
1510        let a_plus_b = Arc::new(BinaryExpr::new(
1511            Arc::clone(col_a),
1512            Operator::Plus,
1513            Arc::clone(col_b),
1514        )) as Arc<dyn PhysicalExpr>;
1515
1516        let option_asc = SortOptions {
1517            descending: false,
1518            nulls_first: false,
1519        };
1520
1521        let proj_exprs = vec![
1522            (col_c, "c_new".to_string()),
1523            (col_d, "d_new".to_string()),
1524            (&a_plus_b, "a+b".to_string()),
1525        ];
1526        let proj_exprs = proj_exprs
1527            .into_iter()
1528            .map(|(expr, name)| (Arc::clone(expr), name));
1529        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1530        let output_schema = output_schema(&projection_mapping, &schema)?;
1531
1532        let col_a_plus_b_new = &col("a+b", &output_schema)?;
1533        let col_c_new = &col("c_new", &output_schema)?;
1534        let col_d_new = &col("d_new", &output_schema)?;
1535
1536        let test_cases = vec![
1537            // ---------- TEST CASE 1 ------------
1538            (
1539                // orderings
1540                vec![
1541                    // [d ASC, b ASC]
1542                    vec![(col_d, option_asc), (col_b, option_asc)],
1543                    // [c ASC, a ASC]
1544                    vec![(col_c, option_asc), (col_a, option_asc)],
1545                ],
1546                // equal conditions
1547                vec![],
1548                // expected
1549                vec![
1550                    // [d_new ASC, c_new ASC, a+b ASC]
1551                    vec![
1552                        (col_d_new, option_asc),
1553                        (col_c_new, option_asc),
1554                        (col_a_plus_b_new, option_asc),
1555                    ],
1556                    // [c_new ASC, d_new ASC, a+b ASC]
1557                    vec![
1558                        (col_c_new, option_asc),
1559                        (col_d_new, option_asc),
1560                        (col_a_plus_b_new, option_asc),
1561                    ],
1562                ],
1563            ),
1564            // ---------- TEST CASE 2 ------------
1565            (
1566                // orderings
1567                vec![
1568                    // [d ASC, b ASC]
1569                    vec![(col_d, option_asc), (col_b, option_asc)],
1570                    // [c ASC, e ASC], Please note that a=e
1571                    vec![(col_c, option_asc), (col_e, option_asc)],
1572                ],
1573                // equal conditions
1574                vec![(col_e, col_a)],
1575                // expected
1576                vec![
1577                    // [d_new ASC, c_new ASC, a+b ASC]
1578                    vec![
1579                        (col_d_new, option_asc),
1580                        (col_c_new, option_asc),
1581                        (col_a_plus_b_new, option_asc),
1582                    ],
1583                    // [c_new ASC, d_new ASC, a+b ASC]
1584                    vec![
1585                        (col_c_new, option_asc),
1586                        (col_d_new, option_asc),
1587                        (col_a_plus_b_new, option_asc),
1588                    ],
1589                ],
1590            ),
1591            // ---------- TEST CASE 3 ------------
1592            (
1593                // orderings
1594                vec![
1595                    // [d ASC, b ASC]
1596                    vec![(col_d, option_asc), (col_b, option_asc)],
1597                    // [c ASC, e ASC], Please note that a=f
1598                    vec![(col_c, option_asc), (col_e, option_asc)],
1599                ],
1600                // equal conditions
1601                vec![(col_a, col_f)],
1602                // expected
1603                vec![
1604                    // [d_new ASC]
1605                    vec![(col_d_new, option_asc)],
1606                    // [c_new ASC]
1607                    vec![(col_c_new, option_asc)],
1608                ],
1609            ),
1610        ];
1611        for (orderings, equal_columns, expected) in test_cases {
1612            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1613            for (lhs, rhs) in equal_columns {
1614                eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?;
1615            }
1616
1617            let orderings = convert_to_orderings(&orderings);
1618            eq_properties.add_orderings(orderings);
1619
1620            let expected = convert_to_orderings(&expected);
1621
1622            let projected_eq =
1623                eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1624            let orderings = projected_eq.oeq_class();
1625
1626            let err_msg = format!(
1627                "actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1628            );
1629
1630            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1631            for expected_ordering in &expected {
1632                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1633            }
1634        }
1635
1636        Ok(())
1637    }
1638
1639    fn get_stats() -> Statistics {
1640        Statistics {
1641            num_rows: Precision::Exact(5),
1642            total_byte_size: Precision::Exact(23),
1643            column_statistics: vec![
1644                ColumnStatistics {
1645                    distinct_count: Precision::Exact(5),
1646                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1647                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1648                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1649                    null_count: Precision::Exact(0),
1650                },
1651                ColumnStatistics {
1652                    distinct_count: Precision::Exact(1),
1653                    max_value: Precision::Exact(ScalarValue::from("x")),
1654                    min_value: Precision::Exact(ScalarValue::from("a")),
1655                    sum_value: Precision::Absent,
1656                    null_count: Precision::Exact(3),
1657                },
1658                ColumnStatistics {
1659                    distinct_count: Precision::Absent,
1660                    max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
1661                    min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
1662                    sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
1663                    null_count: Precision::Absent,
1664                },
1665            ],
1666        }
1667    }
1668
1669    fn get_schema() -> Schema {
1670        let field_0 = Field::new("col0", DataType::Int64, false);
1671        let field_1 = Field::new("col1", DataType::Utf8, false);
1672        let field_2 = Field::new("col2", DataType::Float32, false);
1673        Schema::new(vec![field_0, field_1, field_2])
1674    }
1675
1676    #[test]
1677    fn test_stats_projection_columns_only() {
1678        let source = get_stats();
1679        let schema = get_schema();
1680
1681        let projection = ProjectionExprs::new(vec![
1682            ProjectionExpr {
1683                expr: Arc::new(Column::new("col1", 1)),
1684                alias: "col1".to_string(),
1685            },
1686            ProjectionExpr {
1687                expr: Arc::new(Column::new("col0", 0)),
1688                alias: "col0".to_string(),
1689            },
1690        ]);
1691
1692        let result = projection.project_statistics(source, &schema).unwrap();
1693
1694        let expected = Statistics {
1695            num_rows: Precision::Exact(5),
1696            total_byte_size: Precision::Exact(23),
1697            column_statistics: vec![
1698                ColumnStatistics {
1699                    distinct_count: Precision::Exact(1),
1700                    max_value: Precision::Exact(ScalarValue::from("x")),
1701                    min_value: Precision::Exact(ScalarValue::from("a")),
1702                    sum_value: Precision::Absent,
1703                    null_count: Precision::Exact(3),
1704                },
1705                ColumnStatistics {
1706                    distinct_count: Precision::Exact(5),
1707                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1708                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1709                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1710                    null_count: Precision::Exact(0),
1711                },
1712            ],
1713        };
1714
1715        assert_eq!(result, expected);
1716    }
1717
1718    #[test]
1719    fn test_stats_projection_column_with_primitive_width_only() {
1720        let source = get_stats();
1721        let schema = get_schema();
1722
1723        let projection = ProjectionExprs::new(vec![
1724            ProjectionExpr {
1725                expr: Arc::new(Column::new("col2", 2)),
1726                alias: "col2".to_string(),
1727            },
1728            ProjectionExpr {
1729                expr: Arc::new(Column::new("col0", 0)),
1730                alias: "col0".to_string(),
1731            },
1732        ]);
1733
1734        let result = projection.project_statistics(source, &schema).unwrap();
1735
1736        let expected = Statistics {
1737            num_rows: Precision::Exact(5),
1738            total_byte_size: Precision::Exact(60),
1739            column_statistics: vec![
1740                ColumnStatistics {
1741                    distinct_count: Precision::Absent,
1742                    max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
1743                    min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
1744                    sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
1745                    null_count: Precision::Absent,
1746                },
1747                ColumnStatistics {
1748                    distinct_count: Precision::Exact(5),
1749                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1750                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1751                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1752                    null_count: Precision::Exact(0),
1753                },
1754            ],
1755        };
1756
1757        assert_eq!(result, expected);
1758    }
1759
1760    // Tests for Projection struct
1761
1762    #[test]
1763    fn test_projection_new() -> Result<()> {
1764        let exprs = vec![
1765            ProjectionExpr {
1766                expr: Arc::new(Column::new("a", 0)),
1767                alias: "a".to_string(),
1768            },
1769            ProjectionExpr {
1770                expr: Arc::new(Column::new("b", 1)),
1771                alias: "b".to_string(),
1772            },
1773        ];
1774        let projection = ProjectionExprs::new(exprs.clone());
1775        assert_eq!(projection.as_ref().len(), 2);
1776        Ok(())
1777    }
1778
1779    #[test]
1780    fn test_projection_from_vec() -> Result<()> {
1781        let exprs = vec![ProjectionExpr {
1782            expr: Arc::new(Column::new("x", 0)),
1783            alias: "x".to_string(),
1784        }];
1785        let projection: ProjectionExprs = exprs.clone().into();
1786        assert_eq!(projection.as_ref().len(), 1);
1787        Ok(())
1788    }
1789
1790    #[test]
1791    fn test_projection_as_ref() -> Result<()> {
1792        let exprs = vec![
1793            ProjectionExpr {
1794                expr: Arc::new(Column::new("col1", 0)),
1795                alias: "col1".to_string(),
1796            },
1797            ProjectionExpr {
1798                expr: Arc::new(Column::new("col2", 1)),
1799                alias: "col2".to_string(),
1800            },
1801        ];
1802        let projection = ProjectionExprs::new(exprs);
1803        let as_ref: &[ProjectionExpr] = projection.as_ref();
1804        assert_eq!(as_ref.len(), 2);
1805        Ok(())
1806    }
1807
1808    #[test]
1809    fn test_column_indices_multiple_columns() -> Result<()> {
1810        // Test with reversed column order to ensure proper reordering
1811        let projection = ProjectionExprs::new(vec![
1812            ProjectionExpr {
1813                expr: Arc::new(Column::new("c", 5)),
1814                alias: "c".to_string(),
1815            },
1816            ProjectionExpr {
1817                expr: Arc::new(Column::new("b", 2)),
1818                alias: "b".to_string(),
1819            },
1820            ProjectionExpr {
1821                expr: Arc::new(Column::new("a", 0)),
1822                alias: "a".to_string(),
1823            },
1824        ]);
1825        // Should return sorted indices regardless of projection order
1826        assert_eq!(projection.column_indices(), vec![0, 2, 5]);
1827        Ok(())
1828    }
1829
1830    #[test]
1831    fn test_column_indices_duplicates() -> Result<()> {
1832        // Test that duplicate column indices appear only once
1833        let projection = ProjectionExprs::new(vec![
1834            ProjectionExpr {
1835                expr: Arc::new(Column::new("a", 1)),
1836                alias: "a".to_string(),
1837            },
1838            ProjectionExpr {
1839                expr: Arc::new(Column::new("b", 3)),
1840                alias: "b".to_string(),
1841            },
1842            ProjectionExpr {
1843                expr: Arc::new(Column::new("a2", 1)), // duplicate index
1844                alias: "a2".to_string(),
1845            },
1846        ]);
1847        assert_eq!(projection.column_indices(), vec![1, 3]);
1848        Ok(())
1849    }
1850
1851    #[test]
1852    fn test_column_indices_unsorted() -> Result<()> {
1853        // Test that column indices are sorted in the output
1854        let projection = ProjectionExprs::new(vec![
1855            ProjectionExpr {
1856                expr: Arc::new(Column::new("c", 5)),
1857                alias: "c".to_string(),
1858            },
1859            ProjectionExpr {
1860                expr: Arc::new(Column::new("a", 1)),
1861                alias: "a".to_string(),
1862            },
1863            ProjectionExpr {
1864                expr: Arc::new(Column::new("b", 3)),
1865                alias: "b".to_string(),
1866            },
1867        ]);
1868        assert_eq!(projection.column_indices(), vec![1, 3, 5]);
1869        Ok(())
1870    }
1871
1872    #[test]
1873    fn test_column_indices_complex_expr() -> Result<()> {
1874        // Test with complex expressions containing multiple columns
1875        let expr = Arc::new(BinaryExpr::new(
1876            Arc::new(Column::new("a", 1)),
1877            Operator::Plus,
1878            Arc::new(Column::new("b", 4)),
1879        ));
1880        let projection = ProjectionExprs::new(vec![
1881            ProjectionExpr {
1882                expr,
1883                alias: "sum".to_string(),
1884            },
1885            ProjectionExpr {
1886                expr: Arc::new(Column::new("c", 2)),
1887                alias: "c".to_string(),
1888            },
1889        ]);
1890        // Should return [1, 2, 4] - all columns used, sorted and deduplicated
1891        assert_eq!(projection.column_indices(), vec![1, 2, 4]);
1892        Ok(())
1893    }
1894
1895    #[test]
1896    fn test_column_indices_empty() -> Result<()> {
1897        let projection = ProjectionExprs::new(vec![]);
1898        assert_eq!(projection.column_indices(), Vec::<usize>::new());
1899        Ok(())
1900    }
1901
1902    #[test]
1903    fn test_merge_simple_columns() -> Result<()> {
1904        // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
1905        let base_projection = ProjectionExprs::new(vec![
1906            ProjectionExpr {
1907                expr: Arc::new(Column::new("c", 2)),
1908                alias: "x".to_string(),
1909            },
1910            ProjectionExpr {
1911                expr: Arc::new(Column::new("b", 1)),
1912                alias: "y".to_string(),
1913            },
1914            ProjectionExpr {
1915                expr: Arc::new(Column::new("a", 0)),
1916                alias: "z".to_string(),
1917            },
1918        ]);
1919
1920        // Second projection: SELECT y@1 AS col2, x@0 AS col1
1921        let top_projection = ProjectionExprs::new(vec![
1922            ProjectionExpr {
1923                expr: Arc::new(Column::new("y", 1)),
1924                alias: "col2".to_string(),
1925            },
1926            ProjectionExpr {
1927                expr: Arc::new(Column::new("x", 0)),
1928                alias: "col1".to_string(),
1929            },
1930        ]);
1931
1932        // Merge should produce: SELECT b@1 AS col2, c@2 AS col1
1933        let merged = base_projection.try_merge(&top_projection)?;
1934        assert_snapshot!(format!("{merged}"), @"Projection[b@1 AS col2, c@2 AS col1]");
1935
1936        Ok(())
1937    }
1938
1939    #[test]
1940    fn test_merge_with_expressions() -> Result<()> {
1941        // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
1942        let base_projection = ProjectionExprs::new(vec![
1943            ProjectionExpr {
1944                expr: Arc::new(Column::new("c", 2)),
1945                alias: "x".to_string(),
1946            },
1947            ProjectionExpr {
1948                expr: Arc::new(Column::new("b", 1)),
1949                alias: "y".to_string(),
1950            },
1951            ProjectionExpr {
1952                expr: Arc::new(Column::new("a", 0)),
1953                alias: "z".to_string(),
1954            },
1955        ]);
1956
1957        // Second projection: SELECT y@1 + z@2 AS c2, x@0 + 1 AS c1
1958        let top_projection = ProjectionExprs::new(vec![
1959            ProjectionExpr {
1960                expr: Arc::new(BinaryExpr::new(
1961                    Arc::new(Column::new("y", 1)),
1962                    Operator::Plus,
1963                    Arc::new(Column::new("z", 2)),
1964                )),
1965                alias: "c2".to_string(),
1966            },
1967            ProjectionExpr {
1968                expr: Arc::new(BinaryExpr::new(
1969                    Arc::new(Column::new("x", 0)),
1970                    Operator::Plus,
1971                    Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1972                )),
1973                alias: "c1".to_string(),
1974            },
1975        ]);
1976
1977        // Merge should produce: SELECT b@1 + a@0 AS c2, c@2 + 1 AS c1
1978        let merged = base_projection.try_merge(&top_projection)?;
1979        assert_snapshot!(format!("{merged}"), @"Projection[b@1 + a@0 AS c2, c@2 + 1 AS c1]");
1980
1981        Ok(())
1982    }
1983
1984    #[test]
1985    fn try_merge_error() {
1986        // Create a base projection
1987        let base = ProjectionExprs::new(vec![
1988            ProjectionExpr {
1989                expr: Arc::new(Column::new("a", 0)),
1990                alias: "x".to_string(),
1991            },
1992            ProjectionExpr {
1993                expr: Arc::new(Column::new("b", 1)),
1994                alias: "y".to_string(),
1995            },
1996        ]);
1997
1998        // Create a top projection that references a non-existent column index
1999        let top = ProjectionExprs::new(vec![ProjectionExpr {
2000            expr: Arc::new(Column::new("z", 5)), // Invalid index
2001            alias: "result".to_string(),
2002        }]);
2003
2004        // Attempt to merge and expect an error
2005        let err_msg = base.try_merge(&top).unwrap_err().to_string();
2006        assert!(
2007            err_msg.contains("Internal error: Column index 5 out of bounds for projected expressions of length 2"),
2008            "Unexpected error message: {err_msg}",
2009        );
2010    }
2011
2012    #[test]
2013    fn test_project_schema_simple_columns() -> Result<()> {
2014        // Input schema: [col0: Int64, col1: Utf8, col2: Float32]
2015        let input_schema = get_schema();
2016
2017        // Projection: SELECT col2 AS c, col0 AS a
2018        let projection = ProjectionExprs::new(vec![
2019            ProjectionExpr {
2020                expr: Arc::new(Column::new("col2", 2)),
2021                alias: "c".to_string(),
2022            },
2023            ProjectionExpr {
2024                expr: Arc::new(Column::new("col0", 0)),
2025                alias: "a".to_string(),
2026            },
2027        ]);
2028
2029        let output_schema = projection.project_schema(&input_schema)?;
2030
2031        // Should have 2 fields
2032        assert_eq!(output_schema.fields().len(), 2);
2033
2034        // First field should be "c" with Float32 type
2035        assert_eq!(output_schema.field(0).name(), "c");
2036        assert_eq!(output_schema.field(0).data_type(), &DataType::Float32);
2037
2038        // Second field should be "a" with Int64 type
2039        assert_eq!(output_schema.field(1).name(), "a");
2040        assert_eq!(output_schema.field(1).data_type(), &DataType::Int64);
2041
2042        Ok(())
2043    }
2044
2045    #[test]
2046    fn test_project_schema_with_expressions() -> Result<()> {
2047        // Input schema: [col0: Int64, col1: Utf8, col2: Float32]
2048        let input_schema = get_schema();
2049
2050        // Projection: SELECT col0 + 1 AS incremented
2051        let projection = ProjectionExprs::new(vec![ProjectionExpr {
2052            expr: Arc::new(BinaryExpr::new(
2053                Arc::new(Column::new("col0", 0)),
2054                Operator::Plus,
2055                Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2056            )),
2057            alias: "incremented".to_string(),
2058        }]);
2059
2060        let output_schema = projection.project_schema(&input_schema)?;
2061
2062        // Should have 1 field
2063        assert_eq!(output_schema.fields().len(), 1);
2064
2065        // Field should be "incremented" with Int64 type
2066        assert_eq!(output_schema.field(0).name(), "incremented");
2067        assert_eq!(output_schema.field(0).data_type(), &DataType::Int64);
2068
2069        Ok(())
2070    }
2071
2072    #[test]
2073    fn test_project_schema_preserves_metadata() -> Result<()> {
2074        // Create schema with metadata
2075        let mut metadata = HashMap::new();
2076        metadata.insert("key".to_string(), "value".to_string());
2077        let field_with_metadata =
2078            Field::new("col0", DataType::Int64, false).with_metadata(metadata.clone());
2079        let input_schema = Schema::new(vec![
2080            field_with_metadata,
2081            Field::new("col1", DataType::Utf8, false),
2082        ]);
2083
2084        // Projection: SELECT col0 AS renamed
2085        let projection = ProjectionExprs::new(vec![ProjectionExpr {
2086            expr: Arc::new(Column::new("col0", 0)),
2087            alias: "renamed".to_string(),
2088        }]);
2089
2090        let output_schema = projection.project_schema(&input_schema)?;
2091
2092        // Should have 1 field
2093        assert_eq!(output_schema.fields().len(), 1);
2094
2095        // Field should be "renamed" with metadata preserved
2096        assert_eq!(output_schema.field(0).name(), "renamed");
2097        assert_eq!(output_schema.field(0).metadata(), &metadata);
2098
2099        Ok(())
2100    }
2101
2102    #[test]
2103    fn test_project_schema_empty() -> Result<()> {
2104        let input_schema = get_schema();
2105        let projection = ProjectionExprs::new(vec![]);
2106
2107        let output_schema = projection.project_schema(&input_schema)?;
2108
2109        assert_eq!(output_schema.fields().len(), 0);
2110
2111        Ok(())
2112    }
2113
2114    #[test]
2115    fn test_project_statistics_columns_only() -> Result<()> {
2116        let input_stats = get_stats();
2117        let input_schema = get_schema();
2118
2119        // Projection: SELECT col1 AS text, col0 AS num
2120        let projection = ProjectionExprs::new(vec![
2121            ProjectionExpr {
2122                expr: Arc::new(Column::new("col1", 1)),
2123                alias: "text".to_string(),
2124            },
2125            ProjectionExpr {
2126                expr: Arc::new(Column::new("col0", 0)),
2127                alias: "num".to_string(),
2128            },
2129        ]);
2130
2131        let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2132
2133        // Row count should be preserved
2134        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2135
2136        // Should have 2 column statistics (reordered from input)
2137        assert_eq!(output_stats.column_statistics.len(), 2);
2138
2139        // First column (col1 from input)
2140        assert_eq!(
2141            output_stats.column_statistics[0].distinct_count,
2142            Precision::Exact(1)
2143        );
2144        assert_eq!(
2145            output_stats.column_statistics[0].max_value,
2146            Precision::Exact(ScalarValue::from("x"))
2147        );
2148
2149        // Second column (col0 from input)
2150        assert_eq!(
2151            output_stats.column_statistics[1].distinct_count,
2152            Precision::Exact(5)
2153        );
2154        assert_eq!(
2155            output_stats.column_statistics[1].max_value,
2156            Precision::Exact(ScalarValue::Int64(Some(21)))
2157        );
2158
2159        Ok(())
2160    }
2161
2162    #[test]
2163    fn test_project_statistics_with_expressions() -> Result<()> {
2164        let input_stats = get_stats();
2165        let input_schema = get_schema();
2166
2167        // Projection with expression: SELECT col0 + 1 AS incremented, col1 AS text
2168        let projection = ProjectionExprs::new(vec![
2169            ProjectionExpr {
2170                expr: Arc::new(BinaryExpr::new(
2171                    Arc::new(Column::new("col0", 0)),
2172                    Operator::Plus,
2173                    Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2174                )),
2175                alias: "incremented".to_string(),
2176            },
2177            ProjectionExpr {
2178                expr: Arc::new(Column::new("col1", 1)),
2179                alias: "text".to_string(),
2180            },
2181        ]);
2182
2183        let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2184
2185        // Row count should be preserved
2186        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2187
2188        // Should have 2 column statistics
2189        assert_eq!(output_stats.column_statistics.len(), 2);
2190
2191        // First column (expression) should have unknown statistics
2192        assert_eq!(
2193            output_stats.column_statistics[0].distinct_count,
2194            Precision::Absent
2195        );
2196        assert_eq!(
2197            output_stats.column_statistics[0].max_value,
2198            Precision::Absent
2199        );
2200
2201        // Second column (col1) should preserve statistics
2202        assert_eq!(
2203            output_stats.column_statistics[1].distinct_count,
2204            Precision::Exact(1)
2205        );
2206
2207        Ok(())
2208    }
2209
2210    #[test]
2211    fn test_project_statistics_primitive_width_only() -> Result<()> {
2212        let input_stats = get_stats();
2213        let input_schema = get_schema();
2214
2215        // Projection with only primitive width columns: SELECT col2 AS f, col0 AS i
2216        let projection = ProjectionExprs::new(vec![
2217            ProjectionExpr {
2218                expr: Arc::new(Column::new("col2", 2)),
2219                alias: "f".to_string(),
2220            },
2221            ProjectionExpr {
2222                expr: Arc::new(Column::new("col0", 0)),
2223                alias: "i".to_string(),
2224            },
2225        ]);
2226
2227        let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2228
2229        // Row count should be preserved
2230        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2231
2232        // Total byte size should be recalculated for primitive types
2233        // Float32 (4 bytes) + Int64 (8 bytes) = 12 bytes per row, 5 rows = 60 bytes
2234        assert_eq!(output_stats.total_byte_size, Precision::Exact(60));
2235
2236        // Should have 2 column statistics
2237        assert_eq!(output_stats.column_statistics.len(), 2);
2238
2239        Ok(())
2240    }
2241
2242    #[test]
2243    fn test_project_statistics_empty() -> Result<()> {
2244        let input_stats = get_stats();
2245        let input_schema = get_schema();
2246
2247        let projection = ProjectionExprs::new(vec![]);
2248
2249        let output_stats = projection.project_statistics(input_stats, &input_schema)?;
2250
2251        // Row count should be preserved
2252        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2253
2254        // Should have no column statistics
2255        assert_eq!(output_stats.column_statistics.len(), 0);
2256
2257        // Total byte size should be 0 for empty projection
2258        assert_eq!(output_stats.total_byte_size, Precision::Exact(0));
2259
2260        Ok(())
2261    }
2262}