Skip to main content

datafusion_physical_expr/
projection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ProjectionExpr`] and [`ProjectionExprs`] for representing projections.
19
20use std::ops::Deref;
21use std::sync::Arc;
22
23use crate::PhysicalExpr;
24use crate::expressions::{Column, Literal};
25use crate::utils::collect_columns;
26
27use arrow::array::{RecordBatch, RecordBatchOptions};
28use arrow::datatypes::{Field, Schema, SchemaRef};
29use datafusion_common::stats::{ColumnStatistics, Precision};
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31use datafusion_common::{
32    Result, ScalarValue, Statistics, assert_or_internal_err, internal_datafusion_err,
33    plan_err,
34};
35
36use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet;
37use datafusion_physical_expr_common::metrics::ExpressionEvaluatorMetrics;
38use datafusion_physical_expr_common::physical_expr::fmt_sql;
39use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
40use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays_with_metrics;
41use indexmap::IndexMap;
42use itertools::Itertools;
43
44/// An expression used by projection operations.
45///
46/// The expression is evaluated and the result is stored in a column
47/// with the name specified by `alias`.
48///
49/// For example, the SQL expression `a + b AS sum_ab` would be represented
50/// as a `ProjectionExpr` where `expr` is the expression `a + b`
51/// and `alias` is the string `sum_ab`.
52///
53/// See [`ProjectionExprs`] for a collection of projection expressions.
54#[derive(Debug, Clone)]
55pub struct ProjectionExpr {
56    /// The expression that will be evaluated.
57    pub expr: Arc<dyn PhysicalExpr>,
58    /// The name of the output column for use an output schema.
59    pub alias: String,
60}
61
62impl PartialEq for ProjectionExpr {
63    fn eq(&self, other: &Self) -> bool {
64        let ProjectionExpr { expr, alias } = self;
65        expr.eq(&other.expr) && *alias == other.alias
66    }
67}
68
69impl Eq for ProjectionExpr {}
70
71impl std::fmt::Display for ProjectionExpr {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        if self.expr.to_string() == self.alias {
74            write!(f, "{}", self.alias)
75        } else {
76            write!(f, "{} AS {}", self.expr, self.alias)
77        }
78    }
79}
80
81impl ProjectionExpr {
82    /// Create a new projection expression
83    pub fn new(expr: Arc<dyn PhysicalExpr>, alias: impl Into<String>) -> Self {
84        let alias = alias.into();
85        Self { expr, alias }
86    }
87
88    /// Create a new projection expression from an expression and a schema using the expression's output field name as alias.
89    pub fn new_from_expression(
90        expr: Arc<dyn PhysicalExpr>,
91        schema: &Schema,
92    ) -> Result<Self> {
93        let field = expr.return_field(schema)?;
94        Ok(Self {
95            expr,
96            alias: field.name().to_string(),
97        })
98    }
99}
100
101impl From<(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
102    fn from(value: (Arc<dyn PhysicalExpr>, String)) -> Self {
103        Self::new(value.0, value.1)
104    }
105}
106
107impl From<&(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
108    fn from(value: &(Arc<dyn PhysicalExpr>, String)) -> Self {
109        Self::new(Arc::clone(&value.0), value.1.clone())
110    }
111}
112
113impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
114    fn from(value: ProjectionExpr) -> Self {
115        (value.expr, value.alias)
116    }
117}
118
119/// A collection of  [`ProjectionExpr`] instances, representing a complete
120/// projection operation.
121///
122/// Projection operations are used in query plans to select specific columns or
123/// compute new columns based on existing ones.
124///
125/// See [`ProjectionExprs::from_indices`] to select a subset of columns by
126/// indices.
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct ProjectionExprs {
129    /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance.
130    exprs: Arc<[ProjectionExpr]>,
131}
132
133impl std::fmt::Display for ProjectionExprs {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        let exprs: Vec<String> = self.exprs.iter().map(|e| e.to_string()).collect();
136        write!(f, "Projection[{}]", exprs.join(", "))
137    }
138}
139
140impl From<Vec<ProjectionExpr>> for ProjectionExprs {
141    fn from(value: Vec<ProjectionExpr>) -> Self {
142        Self {
143            exprs: value.into(),
144        }
145    }
146}
147
148impl From<&[ProjectionExpr]> for ProjectionExprs {
149    fn from(value: &[ProjectionExpr]) -> Self {
150        Self {
151            exprs: value.iter().cloned().collect(),
152        }
153    }
154}
155
156impl FromIterator<ProjectionExpr> for ProjectionExprs {
157    fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
158        Self {
159            exprs: exprs.into_iter().collect(),
160        }
161    }
162}
163
164impl AsRef<[ProjectionExpr]> for ProjectionExprs {
165    fn as_ref(&self) -> &[ProjectionExpr] {
166        &self.exprs
167    }
168}
169
170impl ProjectionExprs {
171    /// Make a new [`ProjectionExprs`] from expressions iterator.
172    pub fn new(exprs: impl IntoIterator<Item = ProjectionExpr>) -> Self {
173        Self {
174            exprs: exprs.into_iter().collect(),
175        }
176    }
177
178    /// Make a new [`ProjectionExprs`] from expressions.
179    pub fn from_expressions(exprs: impl Into<Arc<[ProjectionExpr]>>) -> Self {
180        Self {
181            exprs: exprs.into(),
182        }
183    }
184
185    /// Creates a [`ProjectionExpr`] from a list of column indices.
186    ///
187    /// This is a convenience method for creating simple column-only projections, where each projection expression is a reference to a column
188    /// in the input schema.
189    ///
190    /// # Behavior
191    /// - Ordering: the output projection preserves the exact order of indices provided in the input slice
192    ///   For example, `[2, 0, 1]` will produce projections for columns 2, 0, then 1 in that order
193    /// - Duplicates: Duplicate indices are allowed and will create multiple projection expressions referencing the same source column
194    ///   For example, `[0, 0]` creates 2 separate projections both referencing column 0
195    ///
196    /// # Panics
197    /// Panics if any index in `indices` is out of bounds for the provided schema.
198    ///
199    /// # Example
200    ///
201    /// ```rust
202    /// use arrow::datatypes::{DataType, Field, Schema};
203    /// use datafusion_physical_expr::projection::ProjectionExprs;
204    /// use std::sync::Arc;
205    ///
206    /// // Create a schema with three columns
207    /// let schema = Arc::new(Schema::new(vec![
208    ///     Field::new("a", DataType::Int32, false),
209    ///     Field::new("b", DataType::Utf8, false),
210    ///     Field::new("c", DataType::Float64, false),
211    /// ]));
212    ///
213    /// // Project columns at indices 2 and 0 (c and a) - ordering is preserved
214    /// let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
215    ///
216    /// // This creates: SELECT c@2 AS c, a@0 AS a
217    /// assert_eq!(projection.as_ref().len(), 2);
218    /// assert_eq!(projection.as_ref()[0].alias, "c");
219    /// assert_eq!(projection.as_ref()[1].alias, "a");
220    ///
221    /// // Duplicate indices are allowed
222    /// let projection_with_dups = ProjectionExprs::from_indices(&[0, 0, 1], &schema);
223    /// assert_eq!(projection_with_dups.as_ref().len(), 3);
224    /// assert_eq!(projection_with_dups.as_ref()[0].alias, "a");
225    /// assert_eq!(projection_with_dups.as_ref()[1].alias, "a"); // duplicate
226    /// assert_eq!(projection_with_dups.as_ref()[2].alias, "b");
227    /// ```
228    pub fn from_indices(indices: &[usize], schema: &Schema) -> Self {
229        let projection_exprs = indices.iter().map(|&i| {
230            let field = schema.field(i);
231            ProjectionExpr {
232                expr: Arc::new(Column::new(field.name(), i)),
233                alias: field.name().clone(),
234            }
235        });
236
237        Self::from_iter(projection_exprs)
238    }
239
240    /// Returns an iterator over the projection expressions
241    pub fn iter(&self) -> impl Iterator<Item = &ProjectionExpr> {
242        self.exprs.iter()
243    }
244
245    /// Creates a ProjectionMapping from this projection
246    pub fn projection_mapping(
247        &self,
248        input_schema: &SchemaRef,
249    ) -> Result<ProjectionMapping> {
250        ProjectionMapping::try_new(
251            self.exprs
252                .iter()
253                .map(|p| (Arc::clone(&p.expr), p.alias.clone())),
254            input_schema,
255        )
256    }
257
258    /// Iterate over a clone of the projection expressions.
259    pub fn expr_iter(&self) -> impl Iterator<Item = Arc<dyn PhysicalExpr>> + '_ {
260        self.exprs.iter().map(|e| Arc::clone(&e.expr))
261    }
262
263    /// Apply a fallible transformation to the [`PhysicalExpr`] of each projection.
264    ///
265    /// This method transforms the expression in each [`ProjectionExpr`] while preserving
266    /// the alias. This is useful for rewriting expressions, such as when adapting
267    /// expressions to a different schema.
268    ///
269    /// # Example
270    ///
271    /// ```rust
272    /// use std::sync::Arc;
273    /// use arrow::datatypes::{DataType, Field, Schema};
274    /// use datafusion_common::Result;
275    /// use datafusion_physical_expr::expressions::Column;
276    /// use datafusion_physical_expr::projection::ProjectionExprs;
277    /// use datafusion_physical_expr::PhysicalExpr;
278    ///
279    /// // Create a schema and projection
280    /// let schema = Arc::new(Schema::new(vec![
281    ///     Field::new("a", DataType::Int32, false),
282    ///     Field::new("b", DataType::Int32, false),
283    /// ]));
284    /// let projection = ProjectionExprs::from_indices(&[0, 1], &schema);
285    ///
286    /// // Transform each expression (this example just clones them)
287    /// let transformed = projection.try_map_exprs(|expr| Ok(expr))?;
288    /// assert_eq!(transformed.as_ref().len(), 2);
289    /// # Ok::<(), datafusion_common::DataFusionError>(())
290    /// ```
291    pub fn try_map_exprs<F>(self, mut f: F) -> Result<Self>
292    where
293        F: FnMut(Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>,
294    {
295        let exprs = self
296            .exprs
297            .iter()
298            .cloned()
299            .map(|mut proj| {
300                proj.expr = f(proj.expr)?;
301                Ok(proj)
302            })
303            .collect::<Result<Arc<_>>>()?;
304        Ok(Self::from_expressions(exprs))
305    }
306
307    /// Apply another projection on top of this projection, returning the combined projection.
308    /// For example, if this projection is `SELECT c@2 AS x, b@1 AS y, a@0 as z` and the other projection is `SELECT x@0 + 1 AS c1, y@1 + z@2 as c2`,
309    /// we return a projection equivalent to `SELECT c@2 + 1 AS c1, b@1 + a@0 as c2`.
310    ///
311    /// # Example
312    ///
313    /// ```rust
314    /// use datafusion_common::{Result, ScalarValue};
315    /// use datafusion_expr::Operator;
316    /// use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
317    /// use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
318    /// use std::sync::Arc;
319    ///
320    /// fn main() -> Result<()> {
321    ///     // Example from the docstring:
322    ///     // Base projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
323    ///     let base = ProjectionExprs::new(vec![
324    ///         ProjectionExpr {
325    ///             expr: Arc::new(Column::new("c", 2)),
326    ///             alias: "x".to_string(),
327    ///         },
328    ///         ProjectionExpr {
329    ///             expr: Arc::new(Column::new("b", 1)),
330    ///             alias: "y".to_string(),
331    ///         },
332    ///         ProjectionExpr {
333    ///             expr: Arc::new(Column::new("a", 0)),
334    ///             alias: "z".to_string(),
335    ///         },
336    ///     ]);
337    ///
338    ///     // Top projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
339    ///     let top = ProjectionExprs::new(vec![
340    ///         ProjectionExpr {
341    ///             expr: Arc::new(BinaryExpr::new(
342    ///                 Arc::new(Column::new("x", 0)),
343    ///                 Operator::Plus,
344    ///                 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
345    ///             )),
346    ///             alias: "c1".to_string(),
347    ///         },
348    ///         ProjectionExpr {
349    ///             expr: Arc::new(BinaryExpr::new(
350    ///                 Arc::new(Column::new("y", 1)),
351    ///                 Operator::Plus,
352    ///                 Arc::new(Column::new("z", 2)),
353    ///             )),
354    ///             alias: "c2".to_string(),
355    ///         },
356    ///     ]);
357    ///
358    ///     // Expected result: SELECT c@2 + 1 AS c1, b@1 + a@0 AS c2
359    ///     let result = base.try_merge(&top)?;
360    ///
361    ///     assert_eq!(result.as_ref().len(), 2);
362    ///     assert_eq!(result.as_ref()[0].alias, "c1");
363    ///     assert_eq!(result.as_ref()[1].alias, "c2");
364    ///
365    ///     Ok(())
366    /// }
367    /// ```
368    ///
369    /// # Errors
370    /// This function returns an error if any expression in the `other` projection cannot be
371    /// applied on top of this projection.
372    pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
373        let mut new_exprs = Vec::with_capacity(other.exprs.len());
374        for proj_expr in other.exprs.iter() {
375            new_exprs.push(ProjectionExpr {
376                expr: self.unproject_expr(&proj_expr.expr)?,
377                alias: proj_expr.alias.clone(),
378            });
379        }
380        Ok(ProjectionExprs::new(new_exprs))
381    }
382
383    /// Extract the column indices used in this projection.
384    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
385    /// this function would return `[0, 1]`.
386    /// Repeated indices are returned only once, and the order is ascending.
387    pub fn column_indices(&self) -> Vec<usize> {
388        self.exprs
389            .iter()
390            .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index()))
391            .sorted_unstable()
392            .dedup()
393            .collect_vec()
394    }
395
396    /// Extract the ordered column indices for a column-only projection.
397    ///
398    /// This function assumes that all expressions in the projection are simple column references.
399    /// It returns the column indices in the order they appear in the projection.
400    ///
401    /// # Panics
402    ///
403    /// Panics if any expression in the projection is not a simple column reference. This includes:
404    /// - Computed expressions (e.g., `a + 1`, `CAST(a AS INT)`)
405    /// - Function calls (e.g., `UPPER(name)`, `SUM(amount)`)
406    /// - Literals (e.g., `42`, `'hello'`)
407    /// - Complex nested expressions (e.g., `CASE WHEN ... THEN ... END`)
408    ///
409    /// # Returns
410    ///
411    /// A vector of column indices in projection order. Unlike [`column_indices()`](Self::column_indices),
412    /// this function:
413    /// - Preserves the projection order (does not sort)
414    /// - Preserves duplicates (does not deduplicate)
415    ///
416    /// # Example
417    ///
418    /// For a projection `SELECT c, a, c` where `a` is at index 0 and `c` is at index 2,
419    /// this function would return `[2, 0, 2]`.
420    ///
421    /// Use [`column_indices()`](Self::column_indices) instead if the projection may contain
422    /// non-column expressions or if you need a deduplicated sorted list.
423    ///
424    /// # Panics
425    ///
426    /// Panics if any expression in the projection is not a simple column reference.
427    #[deprecated(
428        since = "52.0.0",
429        note = "Use column_indices() instead. This method will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
430    )]
431    pub fn ordered_column_indices(&self) -> Vec<usize> {
432        self.exprs
433            .iter()
434            .map(|e| {
435                e.expr
436                    .as_any()
437                    .downcast_ref::<Column>()
438                    .expect("Expected column reference in projection")
439                    .index()
440            })
441            .collect()
442    }
443
444    /// Project a schema according to this projection.
445    ///
446    /// For example, given a projection:
447    /// * `SELECT a AS x, b + 1 AS y`
448    /// * where `a` is at index 0
449    /// * `b` is at index 1
450    ///
451    /// If the input schema is `[a: Int32, b: Int32, c: Int32]`, the output
452    /// schema would be `[x: Int32, y: Int32]`.
453    ///
454    /// Note that [`Field`] metadata are preserved from the input schema.
455    pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> {
456        let fields: Result<Vec<Field>> = self
457            .exprs
458            .iter()
459            .map(|proj_expr| {
460                let metadata = proj_expr
461                    .expr
462                    .return_field(input_schema)?
463                    .metadata()
464                    .clone();
465
466                let field = Field::new(
467                    &proj_expr.alias,
468                    proj_expr.expr.data_type(input_schema)?,
469                    proj_expr.expr.nullable(input_schema)?,
470                )
471                .with_metadata(metadata);
472
473                Ok(field)
474            })
475            .collect();
476
477        Ok(Schema::new_with_metadata(
478            fields?,
479            input_schema.metadata().clone(),
480        ))
481    }
482
483    /// "unproject" an expression by applying this projection in reverse,
484    /// returning a new set of expressions that reference the original input
485    /// columns.
486    ///
487    /// For example, consider
488    /// * an expression `c1_c2 > 5`, and a schema `[c1, c2]`
489    /// * a projection `c1 + c2 as c1_c2`
490    ///
491    /// This method would rewrite the expression to `c1 + c2 > 5`
492    pub fn unproject_expr(
493        &self,
494        expr: &Arc<dyn PhysicalExpr>,
495    ) -> Result<Arc<dyn PhysicalExpr>> {
496        update_expr(expr, &self.exprs, true)?.ok_or_else(|| {
497            internal_datafusion_err!(
498                "Failed to unproject an expression {} with ProjectionExprs {}",
499                expr,
500                self.exprs.iter().map(|e| format!("{e}")).join(", ")
501            )
502        })
503    }
504
505    /// "project" an expression using these projection's expressions
506    ///
507    /// For example, consider
508    /// * an expression `c1 + c2 > 5`, and a schema `[c1, c2]`
509    /// * a projection `c1 + c2 as c1_c2`
510    ///
511    /// * This method would rewrite the expression to `c1_c2 > 5`
512    pub fn project_expr(
513        &self,
514        expr: &Arc<dyn PhysicalExpr>,
515    ) -> Result<Arc<dyn PhysicalExpr>> {
516        update_expr(expr, &self.exprs, false)?.ok_or_else(|| {
517            internal_datafusion_err!(
518                "Failed to project an expression {} with ProjectionExprs {}",
519                expr,
520                self.exprs.iter().map(|e| format!("{e}")).join(", ")
521            )
522        })
523    }
524
525    /// Create a new [`Projector`] from this projection and an input schema.
526    ///
527    /// A [`Projector`] can be used to apply this projection to record batches.
528    ///
529    /// # Errors
530    /// This function returns an error if the output schema cannot be constructed from the input schema
531    /// with the given projection expressions.
532    /// For example, if an expression only works with integer columns but the input schema has a string column at that index.
533    pub fn make_projector(&self, input_schema: &Schema) -> Result<Projector> {
534        let output_schema = Arc::new(self.project_schema(input_schema)?);
535        Ok(Projector {
536            projection: self.clone(),
537            output_schema,
538            expression_metrics: None,
539        })
540    }
541
542    pub fn create_expression_metrics(
543        &self,
544        metrics: &ExecutionPlanMetricsSet,
545        partition: usize,
546    ) -> ExpressionEvaluatorMetrics {
547        let labels: Vec<String> = self
548            .exprs
549            .iter()
550            .map(|proj_expr| {
551                let expr_sql = fmt_sql(proj_expr.expr.as_ref()).to_string();
552                if proj_expr.expr.to_string() == proj_expr.alias {
553                    expr_sql
554                } else {
555                    format!("{expr_sql} AS {}", proj_expr.alias)
556                }
557            })
558            .collect();
559        ExpressionEvaluatorMetrics::new(metrics, partition, labels)
560    }
561
562    /// Project statistics according to this projection.
563    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
564    /// if the input statistics has column statistics for columns `a`, `b`, and `c`, the output statistics would have column statistics for columns `x` and `y`.
565    ///
566    /// # Example
567    ///
568    /// ```rust
569    /// use arrow::datatypes::{DataType, Field, Schema};
570    /// use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
571    /// use datafusion_physical_expr::projection::ProjectionExprs;
572    /// use datafusion_common::Result;
573    /// use datafusion_common::ScalarValue;
574    /// use std::sync::Arc;
575    ///
576    /// fn main() -> Result<()> {
577    ///     // Input schema: a: Int32, b: Int32, c: Int32
578    ///     let input_schema = Arc::new(Schema::new(vec![
579    ///         Field::new("a", DataType::Int32, false),
580    ///         Field::new("b", DataType::Int32, false),
581    ///         Field::new("c", DataType::Int32, false),
582    ///     ]));
583    ///
584    ///     // Input statistics with column stats for a, b, c
585    ///     let input_stats = Statistics {
586    ///         num_rows: Precision::Exact(100),
587    ///         total_byte_size: Precision::Exact(1200),
588    ///         column_statistics: vec![
589    ///             // Column a stats
590    ///             ColumnStatistics::new_unknown()
591    ///                 .with_null_count(Precision::Exact(0))
592    ///                 .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
593    ///                 .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
594    ///                 .with_distinct_count(Precision::Exact(100)),
595    ///             // Column b stats
596    ///             ColumnStatistics::new_unknown()
597    ///                 .with_null_count(Precision::Exact(0))
598    ///                 .with_min_value(Precision::Exact(ScalarValue::Int32(Some(10))))
599    ///                 .with_max_value(Precision::Exact(ScalarValue::Int32(Some(60))))
600    ///                 .with_distinct_count(Precision::Exact(50)),
601    ///             // Column c stats
602    ///             ColumnStatistics::new_unknown()
603    ///                 .with_null_count(Precision::Exact(5))
604    ///                 .with_min_value(Precision::Exact(ScalarValue::Int32(Some(-10))))
605    ///                 .with_max_value(Precision::Exact(ScalarValue::Int32(Some(200))))
606    ///                 .with_distinct_count(Precision::Exact(25)),
607    ///         ],
608    ///     };
609    ///
610    ///     // Create a projection that selects columns c and a (indices 2 and 0)
611    ///     let projection = ProjectionExprs::from_indices(&[2, 0], &input_schema);
612    ///
613    ///     // Compute output schema
614    ///     let output_schema = projection.project_schema(&input_schema)?;
615    ///
616    ///     // Project the statistics
617    ///     let output_stats = projection.project_statistics(input_stats, &output_schema)?;
618    ///
619    ///     // The output should have 2 column statistics (for c and a, in that order)
620    ///     assert_eq!(output_stats.column_statistics.len(), 2);
621    ///
622    ///     // First column in output is c (was at index 2)
623    ///     assert_eq!(
624    ///         output_stats.column_statistics[0].min_value,
625    ///         Precision::Exact(ScalarValue::Int32(Some(-10)))
626    ///     );
627    ///     assert_eq!(
628    ///         output_stats.column_statistics[0].null_count,
629    ///         Precision::Exact(5)
630    ///     );
631    ///
632    ///     // Second column in output is a (was at index 0)
633    ///     assert_eq!(
634    ///         output_stats.column_statistics[1].min_value,
635    ///         Precision::Exact(ScalarValue::Int32(Some(0)))
636    ///     );
637    ///     assert_eq!(
638    ///         output_stats.column_statistics[1].distinct_count,
639    ///         Precision::Exact(100)
640    ///     );
641    ///
642    ///     // Total byte size is recalculated based on projected columns
643    ///     assert_eq!(
644    ///         output_stats.total_byte_size,
645    ///         Precision::Exact(800), // each Int32 column is 4 bytes * 100 rows * 2 columns
646    ///     );
647    ///
648    ///     // Number of rows remains the same
649    ///     assert_eq!(output_stats.num_rows, Precision::Exact(100));
650    ///
651    ///     Ok(())
652    /// }
653    /// ```
654    pub fn project_statistics(
655        &self,
656        mut stats: Statistics,
657        output_schema: &Schema,
658    ) -> Result<Statistics> {
659        let mut column_statistics = vec![];
660
661        for proj_expr in self.exprs.iter() {
662            let expr = &proj_expr.expr;
663            let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
664                std::mem::take(&mut stats.column_statistics[col.index()])
665            } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
666                // Handle literal expressions (constants) by calculating proper statistics
667                let data_type = expr.data_type(output_schema)?;
668
669                if literal.value().is_null() {
670                    let null_count = match stats.num_rows {
671                        Precision::Exact(num_rows) => Precision::Exact(num_rows),
672                        _ => Precision::Absent,
673                    };
674
675                    ColumnStatistics {
676                        min_value: Precision::Exact(literal.value().clone()),
677                        max_value: Precision::Exact(literal.value().clone()),
678                        distinct_count: Precision::Exact(1),
679                        null_count,
680                        sum_value: Precision::Exact(literal.value().clone()),
681                        byte_size: Precision::Exact(0),
682                    }
683                } else {
684                    let value = literal.value();
685                    let distinct_count = Precision::Exact(1);
686                    let null_count = Precision::Exact(0);
687
688                    let byte_size = if let Some(byte_width) = data_type.primitive_width()
689                    {
690                        stats.num_rows.multiply(&Precision::Exact(byte_width))
691                    } else {
692                        // Complex types depend on array encoding, so set to Absent
693                        Precision::Absent
694                    };
695
696                    let sum_value = Precision::<ScalarValue>::from(stats.num_rows)
697                        .cast_to(&value.data_type())
698                        .ok()
699                        .map(|row_count| {
700                            Precision::Exact(value.clone()).multiply(&row_count)
701                        })
702                        .unwrap_or(Precision::Absent);
703
704                    ColumnStatistics {
705                        min_value: Precision::Exact(value.clone()),
706                        max_value: Precision::Exact(value.clone()),
707                        distinct_count,
708                        null_count,
709                        sum_value,
710                        byte_size,
711                    }
712                }
713            } else {
714                // TODO stats: estimate more statistics from expressions
715                // (expressions should compute their statistics themselves)
716                ColumnStatistics::new_unknown()
717            };
718            column_statistics.push(col_stats);
719        }
720        stats.calculate_total_byte_size(output_schema);
721        stats.column_statistics = column_statistics;
722        Ok(stats)
723    }
724}
725
726impl<'a> IntoIterator for &'a ProjectionExprs {
727    type Item = &'a ProjectionExpr;
728    type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
729
730    fn into_iter(self) -> Self::IntoIter {
731        self.exprs.iter()
732    }
733}
734
735/// Applies a projection to record batches.
736///
737/// A [`Projector`] uses a set of projection expressions to transform
738/// and a pre-computed output schema to project record batches accordingly.
739///
740/// The main reason to use a `Projector` is to avoid repeatedly computing
741/// the output schema for each batch, which can be costly if the projection
742/// expressions are complex.
743#[derive(Clone, Debug)]
744pub struct Projector {
745    projection: ProjectionExprs,
746    output_schema: SchemaRef,
747    /// If `Some`, metrics will be tracked for projection evaluation.
748    expression_metrics: Option<ExpressionEvaluatorMetrics>,
749}
750
751impl Projector {
752    /// Construct the projector with metrics. After execution, related metrics will
753    /// be tracked inside `ExecutionPlanMetricsSet`
754    ///
755    /// See [`ExpressionEvaluatorMetrics`] for details.
756    pub fn with_metrics(
757        &self,
758        metrics: &ExecutionPlanMetricsSet,
759        partition: usize,
760    ) -> Self {
761        let expr_metrics = self
762            .projection
763            .create_expression_metrics(metrics, partition);
764        Self {
765            expression_metrics: Some(expr_metrics),
766            projection: self.projection.clone(),
767            output_schema: Arc::clone(&self.output_schema),
768        }
769    }
770
771    /// Project a record batch according to this projector's expressions.
772    ///
773    /// # Errors
774    /// This function returns an error if any expression evaluation fails
775    /// or if the output schema of the resulting record batch does not match
776    /// the pre-computed output schema of the projector.
777    pub fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
778        let arrays = evaluate_expressions_to_arrays_with_metrics(
779            self.projection.exprs.iter().map(|p| &p.expr),
780            batch,
781            self.expression_metrics.as_ref(),
782        )?;
783
784        if arrays.is_empty() {
785            let options =
786                RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
787            RecordBatch::try_new_with_options(
788                Arc::clone(&self.output_schema),
789                arrays,
790                &options,
791            )
792            .map_err(Into::into)
793        } else {
794            RecordBatch::try_new(Arc::clone(&self.output_schema), arrays)
795                .map_err(Into::into)
796        }
797    }
798
799    pub fn output_schema(&self) -> &SchemaRef {
800        &self.output_schema
801    }
802
803    pub fn projection(&self) -> &ProjectionExprs {
804        &self.projection
805    }
806}
807
808/// Describes an immutable reference counted projection.
809///
810/// This structure represents projecting a set of columns by index.
811/// [`Arc`] is used to make it cheap to clone.
812pub type ProjectionRef = Arc<[usize]>;
813
814/// Combine two projections.
815///
816/// If `p1` is [`None`] then there are no changes.
817/// Otherwise, if passed `p2` is not [`None`] then it is remapped
818/// according to the `p1`. Otherwise, there are no changes.
819///
820/// # Example
821///
822/// If stored projection is [0, 2] and we call `apply_projection([0, 2, 3])`,
823/// then the resulting projection will be [0, 3].
824///
825/// # Error
826///
827/// Returns an internal error if `p1` contains index that is greater than `p2` len.
828///
829pub fn combine_projections(
830    p1: Option<&ProjectionRef>,
831    p2: Option<&ProjectionRef>,
832) -> Result<Option<ProjectionRef>> {
833    let Some(p1) = p1 else {
834        return Ok(None);
835    };
836    let Some(p2) = p2 else {
837        return Ok(Some(Arc::clone(p1)));
838    };
839
840    Ok(Some(
841        p1.iter()
842            .map(|i| {
843                let idx = *i;
844                assert_or_internal_err!(
845                    idx < p2.len(),
846                    "unable to apply projection: index {} is greater than new projection len {}",
847                    idx,
848                    p2.len(),
849                );
850                Ok(p2[*i])
851            })
852            .collect::<Result<Arc<[usize]>>>()?,
853    ))
854}
855
856/// The function projects / unprojects an expression with respect to set of
857/// projection expressions.
858///
859/// See also [`ProjectionExprs::unproject_expr`] and [`ProjectionExprs::project_expr`]
860///
861/// 1) When `unproject` is `true`:
862///
863///    Rewrites an expression with respect to the projection expressions,
864///    effectively "unprojecting" it to reference the original input columns.
865///
866///    For example, given
867///    * the expressions `a@1 + b@2` and `c@0`
868///    * and projection expressions `c@2, a@0, b@1`
869///
870///    Then
871///    * `a@1 + b@2` becomes `a@0 + b@1`
872///    * `c@0` becomes `c@2`
873///
874/// 2) When `unproject` is `false`:
875///
876///    Rewrites the expression to reference the projected expressions,
877///    effectively "projecting" it. The resulting expression will reference the
878///    indices as they appear in the projection.
879///
880///    If the expression cannot be rewritten after the projection, it returns
881///    `None`.
882///
883///    For example, given
884///    * the expressions `c@0`, `a@1` and `b@2`
885///    * the projection `a@1 as a, c@0 as c_new`,
886///
887///    Then
888///    * `c@0` becomes `c_new@1`
889///    * `a@1` becomes `a@0`
890///    * `b@2` results in `None` since the projection does not include `b`.
891///
892/// # Errors
893/// This function returns an error if `unproject` is `true` and if any expression references
894/// an index that is out of bounds for `projected_exprs`.
895/// For example:
896///
897/// - `expr` is `a@3`
898/// - `projected_exprs` is \[`a@0`, `b@1`\]
899///
900/// In this case, `a@3` references index 3, which is out of bounds for `projected_exprs` (which has length 2).
901pub fn update_expr(
902    expr: &Arc<dyn PhysicalExpr>,
903    projected_exprs: &[ProjectionExpr],
904    unproject: bool,
905) -> Result<Option<Arc<dyn PhysicalExpr>>> {
906    #[derive(Debug, PartialEq)]
907    enum RewriteState {
908        /// The expression is unchanged.
909        Unchanged,
910        /// Some part of the expression has been rewritten
911        RewrittenValid,
912        /// Some part of the expression has been rewritten, but some column
913        /// references could not be.
914        RewrittenInvalid,
915    }
916
917    let mut state = RewriteState::Unchanged;
918
919    let new_expr = Arc::clone(expr)
920        .transform_up(|expr| {
921            if state == RewriteState::RewrittenInvalid {
922                return Ok(Transformed::no(expr));
923            }
924
925            let Some(column) = expr.as_any().downcast_ref::<Column>() else {
926                return Ok(Transformed::no(expr));
927            };
928            if unproject {
929                state = RewriteState::RewrittenValid;
930                // Update the index of `column`:
931                let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {
932                    internal_datafusion_err!(
933                        "Column index {} out of bounds for projected expressions of length {}",
934                        column.index(),
935                        projected_exprs.len()
936                    )
937                })?;
938                Ok(Transformed::yes(Arc::clone(&projected_expr.expr)))
939            } else {
940                // default to invalid, in case we can't find the relevant column
941                state = RewriteState::RewrittenInvalid;
942                // Determine how to update `column` to accommodate `projected_exprs`
943                projected_exprs
944                    .iter()
945                    .enumerate()
946                    .find_map(|(index, proj_expr)| {
947                        proj_expr.expr.as_any().downcast_ref::<Column>().and_then(
948                            |projected_column| {
949                                (column.name().eq(projected_column.name())
950                                    && column.index() == projected_column.index())
951                                .then(|| {
952                                    state = RewriteState::RewrittenValid;
953                                    Arc::new(Column::new(&proj_expr.alias, index)) as _
954                                })
955                            },
956                        )
957                    })
958                    .map_or_else(
959                        || Ok(Transformed::no(expr)),
960                        |c| Ok(Transformed::yes(c)),
961                    )
962            }
963        })
964        .data()?;
965
966    match state {
967        RewriteState::RewrittenInvalid => Ok(None),
968        // Both Unchanged and RewrittenValid are valid:
969        // - Unchanged means no columns to rewrite (e.g., literals)
970        // - RewrittenValid means columns were successfully rewritten
971        RewriteState::Unchanged | RewriteState::RewrittenValid => Ok(Some(new_expr)),
972    }
973}
974
975/// Stores target expressions, along with their indices, that associate with a
976/// source expression in a projection mapping.
977#[derive(Clone, Debug, Default)]
978pub struct ProjectionTargets {
979    /// A non-empty vector of pairs of target expressions and their indices.
980    /// Consider using a special non-empty collection type in the future (e.g.
981    /// if Rust provides one in the standard library).
982    exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
983}
984
985impl ProjectionTargets {
986    /// Returns the first target expression and its index.
987    pub fn first(&self) -> &(Arc<dyn PhysicalExpr>, usize) {
988        // Since the vector is non-empty, we can safely unwrap:
989        self.exprs_indices.first().unwrap()
990    }
991
992    /// Adds a target expression and its index to the list of targets.
993    pub fn push(&mut self, target: (Arc<dyn PhysicalExpr>, usize)) {
994        self.exprs_indices.push(target);
995    }
996}
997
998impl Deref for ProjectionTargets {
999    type Target = [(Arc<dyn PhysicalExpr>, usize)];
1000
1001    fn deref(&self) -> &Self::Target {
1002        &self.exprs_indices
1003    }
1004}
1005
1006impl From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets {
1007    fn from(exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>) -> Self {
1008        Self { exprs_indices }
1009    }
1010}
1011
1012/// Stores the mapping between source expressions and target expressions for a
1013/// projection.
1014#[derive(Clone, Debug)]
1015pub struct ProjectionMapping {
1016    /// Mapping between source expressions and target expressions.
1017    /// Vector indices correspond to the indices after projection.
1018    map: IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>,
1019}
1020
1021impl ProjectionMapping {
1022    /// Constructs the mapping between a projection's input and output
1023    /// expressions.
1024    ///
1025    /// For example, given the input projection expressions (`a + b`, `c + d`)
1026    /// and an output schema with two columns `"c + d"` and `"a + b"`, the
1027    /// projection mapping would be:
1028    ///
1029    /// ```text
1030    ///  [0]: (c + d, [(col("c + d"), 0)])
1031    ///  [1]: (a + b, [(col("a + b"), 1)])
1032    /// ```
1033    ///
1034    /// where `col("c + d")` means the column named `"c + d"`.
1035    pub fn try_new(
1036        expr: impl IntoIterator<Item = (Arc<dyn PhysicalExpr>, String)>,
1037        input_schema: &SchemaRef,
1038    ) -> Result<Self> {
1039        // Construct a map from the input expressions to the output expression of the projection:
1040        let mut map = IndexMap::<_, ProjectionTargets>::new();
1041        for (expr_idx, (expr, name)) in expr.into_iter().enumerate() {
1042            let target_expr = Arc::new(Column::new(&name, expr_idx)) as _;
1043            let source_expr = expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
1044                Some(col) => {
1045                    // Sometimes, an expression and its name in the input_schema
1046                    // doesn't match. This can cause problems, so we make sure
1047                    // that the expression name matches with the name in `input_schema`.
1048                    // Conceptually, `source_expr` and `expression` should be the same.
1049                    let idx = col.index();
1050                    let matching_field = input_schema.field(idx);
1051                    let matching_name = matching_field.name();
1052                    assert_or_internal_err!(
1053                        col.name() == matching_name,
1054                        "Input field name {matching_name} does not match with the projection expression {}",
1055                        col.name()
1056                    );
1057                    let matching_column = Column::new(matching_name, idx);
1058                    Ok(Transformed::yes(Arc::new(matching_column)))
1059                }
1060                None => Ok(Transformed::no(e)),
1061            })
1062            .data()?;
1063            map.entry(source_expr)
1064                .or_default()
1065                .push((target_expr, expr_idx));
1066        }
1067        Ok(Self { map })
1068    }
1069
1070    /// Constructs a subset mapping using the provided indices.
1071    ///
1072    /// This is used when the output is a subset of the input without any
1073    /// other transformations. The indices are for columns in the schema.
1074    pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Result<Self> {
1075        let projection_exprs = indices.iter().map(|index| {
1076            let field = schema.field(*index);
1077            let column = Arc::new(Column::new(field.name(), *index));
1078            (column as _, field.name().clone())
1079        });
1080        ProjectionMapping::try_new(projection_exprs, schema)
1081    }
1082}
1083
1084impl Deref for ProjectionMapping {
1085    type Target = IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>;
1086
1087    fn deref(&self) -> &Self::Target {
1088        &self.map
1089    }
1090}
1091
1092impl FromIterator<(Arc<dyn PhysicalExpr>, ProjectionTargets)> for ProjectionMapping {
1093    fn from_iter<T: IntoIterator<Item = (Arc<dyn PhysicalExpr>, ProjectionTargets)>>(
1094        iter: T,
1095    ) -> Self {
1096        Self {
1097            map: IndexMap::from_iter(iter),
1098        }
1099    }
1100}
1101
1102/// Projects a slice of [LexOrdering]s onto the given schema.
1103///
1104/// This is a convenience wrapper that applies [project_ordering] to each
1105/// input ordering and collects the successful projections:
1106/// - For each input ordering, the result of [project_ordering] is appended to
1107///   the output if it is `Some(...)`.
1108/// - Order is preserved and no deduplication is attempted.
1109/// - If none of the input orderings can be projected, an empty `Vec` is
1110///   returned.
1111///
1112/// See [project_ordering] for the semantics of projecting a single
1113/// [LexOrdering].
1114pub fn project_orderings(
1115    orderings: &[LexOrdering],
1116    schema: &SchemaRef,
1117) -> Vec<LexOrdering> {
1118    let mut projected_orderings = vec![];
1119
1120    for ordering in orderings {
1121        projected_orderings.extend(project_ordering(ordering, schema));
1122    }
1123
1124    projected_orderings
1125}
1126
1127/// Projects a single [LexOrdering] onto the given schema.
1128///
1129/// This function attempts to rewrite every [PhysicalSortExpr] in the provided
1130/// [LexOrdering] so that any [Column] expressions point at the correct field
1131/// indices in `schema`.
1132///
1133/// Key details:
1134/// - Columns are matched by name, not by index. The index of each matched
1135///   column is looked up with [Schema::column_with_name](arrow::datatypes::Schema::column_with_name) and a new
1136///   [Column] with the correct [index](Column::index) is substituted.
1137/// - If an expression references a column name that does not exist in
1138///   `schema`, projection of the current ordering stops and only the already
1139///   rewritten prefix is kept. This models the fact that a lexicographical
1140///   ordering remains valid for any leading prefix whose expressions are
1141///   present in the projected schema.
1142/// - If no expressions can be projected (i.e. the first one is missing), the
1143///   function returns `None`.
1144///
1145/// Return value:
1146/// - `Some(LexOrdering)` if at least one sort expression could be projected.
1147///   The returned ordering may be a strict prefix of the input ordering.
1148/// - `None` if no part of the ordering can be projected onto `schema`.
1149///
1150/// Example
1151///
1152/// Suppose we have an input ordering `[col("a@0"), col("b@1")]` but the projected
1153/// schema only contains b and not a. The result will be `Some([col("a@0")])`. In other
1154/// words, the column reference is reindexed to match the projected schema.
1155/// If neither a nor b is present, the result will be None.
1156pub fn project_ordering(
1157    ordering: &LexOrdering,
1158    schema: &SchemaRef,
1159) -> Option<LexOrdering> {
1160    let mut projected_exprs = vec![];
1161    for PhysicalSortExpr { expr, options } in ordering.iter() {
1162        let transformed = Arc::clone(expr).transform_up(|expr| {
1163            let Some(col) = expr.as_any().downcast_ref::<Column>() else {
1164                return Ok(Transformed::no(expr));
1165            };
1166
1167            let name = col.name();
1168            if let Some((idx, _)) = schema.column_with_name(name) {
1169                // Compute the new column expression (with correct index) after projection:
1170                Ok(Transformed::yes(Arc::new(Column::new(name, idx))))
1171            } else {
1172                // Cannot find expression in the projected_schema,
1173                // signal this using an Err result
1174                plan_err!("")
1175            }
1176        });
1177
1178        match transformed {
1179            Ok(transformed) => {
1180                projected_exprs.push(PhysicalSortExpr::new(transformed.data, *options));
1181            }
1182            Err(_) => {
1183                // Err result indicates an expression could not be found in the
1184                // projected_schema, stop iterating since rest of the orderings are violated
1185                break;
1186            }
1187        }
1188    }
1189
1190    LexOrdering::new(projected_exprs)
1191}
1192
1193#[cfg(test)]
1194pub(crate) mod tests {
1195    use std::collections::HashMap;
1196
1197    use super::*;
1198    use crate::equivalence::{EquivalenceProperties, convert_to_orderings};
1199    use crate::expressions::{BinaryExpr, Literal, col};
1200    use crate::utils::tests::TestScalarUDF;
1201    use crate::{PhysicalExprRef, ScalarFunctionExpr};
1202
1203    use arrow::compute::SortOptions;
1204    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1205    use datafusion_common::config::ConfigOptions;
1206    use datafusion_common::stats::Precision;
1207    use datafusion_common::{ScalarValue, Statistics};
1208    use datafusion_expr::{Operator, ScalarUDF};
1209    use insta::assert_snapshot;
1210
1211    pub(crate) fn output_schema(
1212        mapping: &ProjectionMapping,
1213        input_schema: &Arc<Schema>,
1214    ) -> Result<SchemaRef> {
1215        // Calculate output schema:
1216        let mut fields = vec![];
1217        for (source, targets) in mapping.iter() {
1218            let data_type = source.data_type(input_schema)?;
1219            let nullable = source.nullable(input_schema)?;
1220            for (target, _) in targets.iter() {
1221                let Some(column) = target.as_any().downcast_ref::<Column>() else {
1222                    return plan_err!("Expects to have column");
1223                };
1224                fields.push(Field::new(column.name(), data_type.clone(), nullable));
1225            }
1226        }
1227
1228        let output_schema = Arc::new(Schema::new_with_metadata(
1229            fields,
1230            input_schema.metadata().clone(),
1231        ));
1232
1233        Ok(output_schema)
1234    }
1235
1236    #[test]
1237    fn project_orderings() -> Result<()> {
1238        let schema = Arc::new(Schema::new(vec![
1239            Field::new("a", DataType::Int32, true),
1240            Field::new("b", DataType::Int32, true),
1241            Field::new("c", DataType::Int32, true),
1242            Field::new("d", DataType::Int32, true),
1243            Field::new("e", DataType::Int32, true),
1244            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1245        ]));
1246        let col_a = &col("a", &schema)?;
1247        let col_b = &col("b", &schema)?;
1248        let col_c = &col("c", &schema)?;
1249        let col_d = &col("d", &schema)?;
1250        let col_e = &col("e", &schema)?;
1251        let col_ts = &col("ts", &schema)?;
1252        let a_plus_b = Arc::new(BinaryExpr::new(
1253            Arc::clone(col_a),
1254            Operator::Plus,
1255            Arc::clone(col_b),
1256        )) as Arc<dyn PhysicalExpr>;
1257        let b_plus_d = Arc::new(BinaryExpr::new(
1258            Arc::clone(col_b),
1259            Operator::Plus,
1260            Arc::clone(col_d),
1261        )) as Arc<dyn PhysicalExpr>;
1262        let b_plus_e = Arc::new(BinaryExpr::new(
1263            Arc::clone(col_b),
1264            Operator::Plus,
1265            Arc::clone(col_e),
1266        )) as Arc<dyn PhysicalExpr>;
1267        let c_plus_d = Arc::new(BinaryExpr::new(
1268            Arc::clone(col_c),
1269            Operator::Plus,
1270            Arc::clone(col_d),
1271        )) as Arc<dyn PhysicalExpr>;
1272
1273        let option_asc = SortOptions {
1274            descending: false,
1275            nulls_first: false,
1276        };
1277        let option_desc = SortOptions {
1278            descending: true,
1279            nulls_first: true,
1280        };
1281
1282        let test_cases = vec![
1283            // ---------- TEST CASE 1 ------------
1284            (
1285                // orderings
1286                vec![
1287                    // [b ASC]
1288                    vec![(col_b, option_asc)],
1289                ],
1290                // projection exprs
1291                vec![(col_b, "b_new".to_string()), (col_a, "a_new".to_string())],
1292                // expected
1293                vec![
1294                    // [b_new ASC]
1295                    vec![("b_new", option_asc)],
1296                ],
1297            ),
1298            // ---------- TEST CASE 2 ------------
1299            (
1300                // orderings
1301                vec![
1302                    // empty ordering
1303                ],
1304                // projection exprs
1305                vec![(col_c, "c_new".to_string()), (col_b, "b_new".to_string())],
1306                // expected
1307                vec![
1308                    // no ordering at the output
1309                ],
1310            ),
1311            // ---------- TEST CASE 3 ------------
1312            (
1313                // orderings
1314                vec![
1315                    // [ts ASC]
1316                    vec![(col_ts, option_asc)],
1317                ],
1318                // projection exprs
1319                vec![
1320                    (col_b, "b_new".to_string()),
1321                    (col_a, "a_new".to_string()),
1322                    (col_ts, "ts_new".to_string()),
1323                ],
1324                // expected
1325                vec![
1326                    // [ts_new ASC]
1327                    vec![("ts_new", option_asc)],
1328                ],
1329            ),
1330            // ---------- TEST CASE 4 ------------
1331            (
1332                // orderings
1333                vec![
1334                    // [a ASC, ts ASC]
1335                    vec![(col_a, option_asc), (col_ts, option_asc)],
1336                    // [b ASC, ts ASC]
1337                    vec![(col_b, option_asc), (col_ts, option_asc)],
1338                ],
1339                // projection exprs
1340                vec![
1341                    (col_b, "b_new".to_string()),
1342                    (col_a, "a_new".to_string()),
1343                    (col_ts, "ts_new".to_string()),
1344                ],
1345                // expected
1346                vec![
1347                    // [a_new ASC, ts_new ASC]
1348                    vec![("a_new", option_asc), ("ts_new", option_asc)],
1349                    // [b_new ASC, ts_new ASC]
1350                    vec![("b_new", option_asc), ("ts_new", option_asc)],
1351                ],
1352            ),
1353            // ---------- TEST CASE 5 ------------
1354            (
1355                // orderings
1356                vec![
1357                    // [a + b ASC]
1358                    vec![(&a_plus_b, option_asc)],
1359                ],
1360                // projection exprs
1361                vec![
1362                    (col_b, "b_new".to_string()),
1363                    (col_a, "a_new".to_string()),
1364                    (&a_plus_b, "a+b".to_string()),
1365                ],
1366                // expected
1367                vec![
1368                    // [a + b ASC]
1369                    vec![("a+b", option_asc)],
1370                ],
1371            ),
1372            // ---------- TEST CASE 6 ------------
1373            (
1374                // orderings
1375                vec![
1376                    // [a + b ASC, c ASC]
1377                    vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1378                ],
1379                // projection exprs
1380                vec![
1381                    (col_b, "b_new".to_string()),
1382                    (col_a, "a_new".to_string()),
1383                    (col_c, "c_new".to_string()),
1384                    (&a_plus_b, "a+b".to_string()),
1385                ],
1386                // expected
1387                vec![
1388                    // [a + b ASC, c_new ASC]
1389                    vec![("a+b", option_asc), ("c_new", option_asc)],
1390                ],
1391            ),
1392            // ------- TEST CASE 7 ----------
1393            (
1394                vec![
1395                    // [a ASC, b ASC, c ASC]
1396                    vec![(col_a, option_asc), (col_b, option_asc)],
1397                    // [a ASC, d ASC]
1398                    vec![(col_a, option_asc), (col_d, option_asc)],
1399                ],
1400                // b as b_new, a as a_new, d as d_new b+d
1401                vec![
1402                    (col_b, "b_new".to_string()),
1403                    (col_a, "a_new".to_string()),
1404                    (col_d, "d_new".to_string()),
1405                    (&b_plus_d, "b+d".to_string()),
1406                ],
1407                // expected
1408                vec![
1409                    // [a_new ASC, b_new ASC]
1410                    vec![("a_new", option_asc), ("b_new", option_asc)],
1411                    // [a_new ASC, d_new ASC]
1412                    vec![("a_new", option_asc), ("d_new", option_asc)],
1413                    // [a_new ASC, b+d ASC]
1414                    vec![("a_new", option_asc), ("b+d", option_asc)],
1415                ],
1416            ),
1417            // ------- TEST CASE 8 ----------
1418            (
1419                // orderings
1420                vec![
1421                    // [b+d ASC]
1422                    vec![(&b_plus_d, option_asc)],
1423                ],
1424                // proj exprs
1425                vec![
1426                    (col_b, "b_new".to_string()),
1427                    (col_a, "a_new".to_string()),
1428                    (col_d, "d_new".to_string()),
1429                    (&b_plus_d, "b+d".to_string()),
1430                ],
1431                // expected
1432                vec![
1433                    // [b+d ASC]
1434                    vec![("b+d", option_asc)],
1435                ],
1436            ),
1437            // ------- TEST CASE 9 ----------
1438            (
1439                // orderings
1440                vec![
1441                    // [a ASC, d ASC, b ASC]
1442                    vec![
1443                        (col_a, option_asc),
1444                        (col_d, option_asc),
1445                        (col_b, option_asc),
1446                    ],
1447                    // [c ASC]
1448                    vec![(col_c, option_asc)],
1449                ],
1450                // proj exprs
1451                vec![
1452                    (col_b, "b_new".to_string()),
1453                    (col_a, "a_new".to_string()),
1454                    (col_d, "d_new".to_string()),
1455                    (col_c, "c_new".to_string()),
1456                ],
1457                // expected
1458                vec![
1459                    // [a_new ASC, d_new ASC, b_new ASC]
1460                    vec![
1461                        ("a_new", option_asc),
1462                        ("d_new", option_asc),
1463                        ("b_new", option_asc),
1464                    ],
1465                    // [c_new ASC],
1466                    vec![("c_new", option_asc)],
1467                ],
1468            ),
1469            // ------- TEST CASE 10 ----------
1470            (
1471                vec![
1472                    // [a ASC, b ASC, c ASC]
1473                    vec![
1474                        (col_a, option_asc),
1475                        (col_b, option_asc),
1476                        (col_c, option_asc),
1477                    ],
1478                    // [a ASC, d ASC]
1479                    vec![(col_a, option_asc), (col_d, option_asc)],
1480                ],
1481                // proj exprs
1482                vec![
1483                    (col_b, "b_new".to_string()),
1484                    (col_a, "a_new".to_string()),
1485                    (col_c, "c_new".to_string()),
1486                    (&c_plus_d, "c+d".to_string()),
1487                ],
1488                // expected
1489                vec![
1490                    // [a_new ASC, b_new ASC, c_new ASC]
1491                    vec![
1492                        ("a_new", option_asc),
1493                        ("b_new", option_asc),
1494                        ("c_new", option_asc),
1495                    ],
1496                    // [a_new ASC, b_new ASC, c+d ASC]
1497                    vec![
1498                        ("a_new", option_asc),
1499                        ("b_new", option_asc),
1500                        ("c+d", option_asc),
1501                    ],
1502                ],
1503            ),
1504            // ------- TEST CASE 11 ----------
1505            (
1506                // orderings
1507                vec![
1508                    // [a ASC, b ASC]
1509                    vec![(col_a, option_asc), (col_b, option_asc)],
1510                    // [a ASC, d ASC]
1511                    vec![(col_a, option_asc), (col_d, option_asc)],
1512                ],
1513                // proj exprs
1514                vec![
1515                    (col_b, "b_new".to_string()),
1516                    (col_a, "a_new".to_string()),
1517                    (&b_plus_d, "b+d".to_string()),
1518                ],
1519                // expected
1520                vec![
1521                    // [a_new ASC, b_new ASC]
1522                    vec![("a_new", option_asc), ("b_new", option_asc)],
1523                    // [a_new ASC, b + d ASC]
1524                    vec![("a_new", option_asc), ("b+d", option_asc)],
1525                ],
1526            ),
1527            // ------- TEST CASE 12 ----------
1528            (
1529                // orderings
1530                vec![
1531                    // [a ASC, b ASC, c ASC]
1532                    vec![
1533                        (col_a, option_asc),
1534                        (col_b, option_asc),
1535                        (col_c, option_asc),
1536                    ],
1537                ],
1538                // proj exprs
1539                vec![(col_c, "c_new".to_string()), (col_a, "a_new".to_string())],
1540                // expected
1541                vec![
1542                    // [a_new ASC]
1543                    vec![("a_new", option_asc)],
1544                ],
1545            ),
1546            // ------- TEST CASE 13 ----------
1547            (
1548                // orderings
1549                vec![
1550                    // [a ASC, b ASC, c ASC]
1551                    vec![
1552                        (col_a, option_asc),
1553                        (col_b, option_asc),
1554                        (col_c, option_asc),
1555                    ],
1556                    // [a ASC, a + b ASC, c ASC]
1557                    vec![
1558                        (col_a, option_asc),
1559                        (&a_plus_b, option_asc),
1560                        (col_c, option_asc),
1561                    ],
1562                ],
1563                // proj exprs
1564                vec![
1565                    (col_c, "c_new".to_string()),
1566                    (col_b, "b_new".to_string()),
1567                    (col_a, "a_new".to_string()),
1568                    (&a_plus_b, "a+b".to_string()),
1569                ],
1570                // expected
1571                vec![
1572                    // [a_new ASC, b_new ASC, c_new ASC]
1573                    vec![
1574                        ("a_new", option_asc),
1575                        ("b_new", option_asc),
1576                        ("c_new", option_asc),
1577                    ],
1578                    // [a_new ASC, a+b ASC, c_new ASC]
1579                    vec![
1580                        ("a_new", option_asc),
1581                        ("a+b", option_asc),
1582                        ("c_new", option_asc),
1583                    ],
1584                ],
1585            ),
1586            // ------- TEST CASE 14 ----------
1587            (
1588                // orderings
1589                vec![
1590                    // [a ASC, b ASC]
1591                    vec![(col_a, option_asc), (col_b, option_asc)],
1592                    // [c ASC, b ASC]
1593                    vec![(col_c, option_asc), (col_b, option_asc)],
1594                    // [d ASC, e ASC]
1595                    vec![(col_d, option_asc), (col_e, option_asc)],
1596                ],
1597                // proj exprs
1598                vec![
1599                    (col_c, "c_new".to_string()),
1600                    (col_d, "d_new".to_string()),
1601                    (col_a, "a_new".to_string()),
1602                    (&b_plus_e, "b+e".to_string()),
1603                ],
1604                // expected
1605                vec![
1606                    // [a_new ASC, d_new ASC, b+e ASC]
1607                    vec![
1608                        ("a_new", option_asc),
1609                        ("d_new", option_asc),
1610                        ("b+e", option_asc),
1611                    ],
1612                    // [d_new ASC, a_new ASC, b+e ASC]
1613                    vec![
1614                        ("d_new", option_asc),
1615                        ("a_new", option_asc),
1616                        ("b+e", option_asc),
1617                    ],
1618                    // [c_new ASC, d_new ASC, b+e ASC]
1619                    vec![
1620                        ("c_new", option_asc),
1621                        ("d_new", option_asc),
1622                        ("b+e", option_asc),
1623                    ],
1624                    // [d_new ASC, c_new ASC, b+e ASC]
1625                    vec![
1626                        ("d_new", option_asc),
1627                        ("c_new", option_asc),
1628                        ("b+e", option_asc),
1629                    ],
1630                ],
1631            ),
1632            // ------- TEST CASE 15 ----------
1633            (
1634                // orderings
1635                vec![
1636                    // [a ASC, c ASC, b ASC]
1637                    vec![
1638                        (col_a, option_asc),
1639                        (col_c, option_asc),
1640                        (col_b, option_asc),
1641                    ],
1642                ],
1643                // proj exprs
1644                vec![
1645                    (col_c, "c_new".to_string()),
1646                    (col_a, "a_new".to_string()),
1647                    (&a_plus_b, "a+b".to_string()),
1648                ],
1649                // expected
1650                vec![
1651                    // [a_new ASC, d_new ASC, b+e ASC]
1652                    vec![
1653                        ("a_new", option_asc),
1654                        ("c_new", option_asc),
1655                        ("a+b", option_asc),
1656                    ],
1657                ],
1658            ),
1659            // ------- TEST CASE 16 ----------
1660            (
1661                // orderings
1662                vec![
1663                    // [a ASC, b ASC]
1664                    vec![(col_a, option_asc), (col_b, option_asc)],
1665                    // [c ASC, b DESC]
1666                    vec![(col_c, option_asc), (col_b, option_desc)],
1667                    // [e ASC]
1668                    vec![(col_e, option_asc)],
1669                ],
1670                // proj exprs
1671                vec![
1672                    (col_c, "c_new".to_string()),
1673                    (col_a, "a_new".to_string()),
1674                    (col_b, "b_new".to_string()),
1675                    (&b_plus_e, "b+e".to_string()),
1676                ],
1677                // expected
1678                vec![
1679                    // [a_new ASC, b_new ASC]
1680                    vec![("a_new", option_asc), ("b_new", option_asc)],
1681                    // [a_new ASC, b_new ASC]
1682                    vec![("a_new", option_asc), ("b+e", option_asc)],
1683                    // [c_new ASC, b_new DESC]
1684                    vec![("c_new", option_asc), ("b_new", option_desc)],
1685                ],
1686            ),
1687        ];
1688
1689        for (idx, (orderings, proj_exprs, expected)) in test_cases.into_iter().enumerate()
1690        {
1691            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1692
1693            let orderings = convert_to_orderings(&orderings);
1694            eq_properties.add_orderings(orderings);
1695
1696            let proj_exprs = proj_exprs
1697                .into_iter()
1698                .map(|(expr, name)| (Arc::clone(expr), name));
1699            let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1700            let output_schema = output_schema(&projection_mapping, &schema)?;
1701
1702            let expected = expected
1703                .into_iter()
1704                .map(|ordering| {
1705                    ordering
1706                        .into_iter()
1707                        .map(|(name, options)| {
1708                            (col(name, &output_schema).unwrap(), options)
1709                        })
1710                        .collect::<Vec<_>>()
1711                })
1712                .collect::<Vec<_>>();
1713            let expected = convert_to_orderings(&expected);
1714
1715            let projected_eq = eq_properties.project(&projection_mapping, output_schema);
1716            let orderings = projected_eq.oeq_class();
1717
1718            let err_msg = format!(
1719                "test_idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1720            );
1721
1722            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1723            for expected_ordering in &expected {
1724                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1725            }
1726        }
1727
1728        Ok(())
1729    }
1730
1731    #[test]
1732    fn project_orderings2() -> Result<()> {
1733        let schema = Arc::new(Schema::new(vec![
1734            Field::new("a", DataType::Int32, true),
1735            Field::new("b", DataType::Int32, true),
1736            Field::new("c", DataType::Int32, true),
1737            Field::new("d", DataType::Int32, true),
1738            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1739        ]));
1740        let col_a = &col("a", &schema)?;
1741        let col_b = &col("b", &schema)?;
1742        let col_c = &col("c", &schema)?;
1743        let col_ts = &col("ts", &schema)?;
1744        let a_plus_b = Arc::new(BinaryExpr::new(
1745            Arc::clone(col_a),
1746            Operator::Plus,
1747            Arc::clone(col_b),
1748        )) as Arc<dyn PhysicalExpr>;
1749
1750        let test_fun = Arc::new(ScalarUDF::new_from_impl(TestScalarUDF::new()));
1751
1752        let round_c = Arc::new(ScalarFunctionExpr::try_new(
1753            test_fun,
1754            vec![Arc::clone(col_c)],
1755            &schema,
1756            Arc::new(ConfigOptions::default()),
1757        )?) as PhysicalExprRef;
1758
1759        let option_asc = SortOptions {
1760            descending: false,
1761            nulls_first: false,
1762        };
1763
1764        let proj_exprs = vec![
1765            (col_b, "b_new".to_string()),
1766            (col_a, "a_new".to_string()),
1767            (col_c, "c_new".to_string()),
1768            (&round_c, "round_c_res".to_string()),
1769        ];
1770        let proj_exprs = proj_exprs
1771            .into_iter()
1772            .map(|(expr, name)| (Arc::clone(expr), name));
1773        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1774        let output_schema = output_schema(&projection_mapping, &schema)?;
1775
1776        let col_a_new = &col("a_new", &output_schema)?;
1777        let col_b_new = &col("b_new", &output_schema)?;
1778        let col_c_new = &col("c_new", &output_schema)?;
1779        let col_round_c_res = &col("round_c_res", &output_schema)?;
1780        let a_new_plus_b_new = Arc::new(BinaryExpr::new(
1781            Arc::clone(col_a_new),
1782            Operator::Plus,
1783            Arc::clone(col_b_new),
1784        )) as Arc<dyn PhysicalExpr>;
1785
1786        let test_cases = [
1787            // ---------- TEST CASE 1 ------------
1788            (
1789                // orderings
1790                vec![
1791                    // [a ASC]
1792                    vec![(col_a, option_asc)],
1793                ],
1794                // expected
1795                vec![
1796                    // [b_new ASC]
1797                    vec![(col_a_new, option_asc)],
1798                ],
1799            ),
1800            // ---------- TEST CASE 2 ------------
1801            (
1802                // orderings
1803                vec![
1804                    // [a+b ASC]
1805                    vec![(&a_plus_b, option_asc)],
1806                ],
1807                // expected
1808                vec![
1809                    // [b_new ASC]
1810                    vec![(&a_new_plus_b_new, option_asc)],
1811                ],
1812            ),
1813            // ---------- TEST CASE 3 ------------
1814            (
1815                // orderings
1816                vec![
1817                    // [a ASC, ts ASC]
1818                    vec![(col_a, option_asc), (col_ts, option_asc)],
1819                ],
1820                // expected
1821                vec![
1822                    // [a_new ASC, date_bin_res ASC]
1823                    vec![(col_a_new, option_asc)],
1824                ],
1825            ),
1826            // ---------- TEST CASE 4 ------------
1827            (
1828                // orderings
1829                vec![
1830                    // [a ASC, ts ASC, b ASC]
1831                    vec![
1832                        (col_a, option_asc),
1833                        (col_ts, option_asc),
1834                        (col_b, option_asc),
1835                    ],
1836                ],
1837                // expected
1838                vec![
1839                    // [a_new ASC, date_bin_res ASC]
1840                    vec![(col_a_new, option_asc)],
1841                ],
1842            ),
1843            // ---------- TEST CASE 5 ------------
1844            (
1845                // orderings
1846                vec![
1847                    // [a ASC, c ASC]
1848                    vec![(col_a, option_asc), (col_c, option_asc)],
1849                ],
1850                // expected
1851                vec![
1852                    // [a_new ASC, round_c_res ASC, c_new ASC]
1853                    vec![(col_a_new, option_asc), (col_round_c_res, option_asc)],
1854                    // [a_new ASC, c_new ASC]
1855                    vec![(col_a_new, option_asc), (col_c_new, option_asc)],
1856                ],
1857            ),
1858            // ---------- TEST CASE 6 ------------
1859            (
1860                // orderings
1861                vec![
1862                    // [c ASC, b ASC]
1863                    vec![(col_c, option_asc), (col_b, option_asc)],
1864                ],
1865                // expected
1866                vec![
1867                    // [round_c_res ASC]
1868                    vec![(col_round_c_res, option_asc)],
1869                    // [c_new ASC, b_new ASC]
1870                    vec![(col_c_new, option_asc), (col_b_new, option_asc)],
1871                ],
1872            ),
1873            // ---------- TEST CASE 7 ------------
1874            (
1875                // orderings
1876                vec![
1877                    // [a+b ASC, c ASC]
1878                    vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1879                ],
1880                // expected
1881                vec![
1882                    // [a+b ASC, round(c) ASC, c_new ASC]
1883                    vec![
1884                        (&a_new_plus_b_new, option_asc),
1885                        (col_round_c_res, option_asc),
1886                    ],
1887                    // [a+b ASC, c_new ASC]
1888                    vec![(&a_new_plus_b_new, option_asc), (col_c_new, option_asc)],
1889                ],
1890            ),
1891        ];
1892
1893        for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
1894            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1895
1896            let orderings = convert_to_orderings(orderings);
1897            eq_properties.add_orderings(orderings);
1898
1899            let expected = convert_to_orderings(expected);
1900
1901            let projected_eq =
1902                eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1903            let orderings = projected_eq.oeq_class();
1904
1905            let err_msg = format!(
1906                "test idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1907            );
1908
1909            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1910            for expected_ordering in &expected {
1911                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1912            }
1913        }
1914        Ok(())
1915    }
1916
1917    #[test]
1918    fn project_orderings3() -> Result<()> {
1919        let schema = Arc::new(Schema::new(vec![
1920            Field::new("a", DataType::Int32, true),
1921            Field::new("b", DataType::Int32, true),
1922            Field::new("c", DataType::Int32, true),
1923            Field::new("d", DataType::Int32, true),
1924            Field::new("e", DataType::Int32, true),
1925            Field::new("f", DataType::Int32, true),
1926        ]));
1927        let col_a = &col("a", &schema)?;
1928        let col_b = &col("b", &schema)?;
1929        let col_c = &col("c", &schema)?;
1930        let col_d = &col("d", &schema)?;
1931        let col_e = &col("e", &schema)?;
1932        let col_f = &col("f", &schema)?;
1933        let a_plus_b = Arc::new(BinaryExpr::new(
1934            Arc::clone(col_a),
1935            Operator::Plus,
1936            Arc::clone(col_b),
1937        )) as Arc<dyn PhysicalExpr>;
1938
1939        let option_asc = SortOptions {
1940            descending: false,
1941            nulls_first: false,
1942        };
1943
1944        let proj_exprs = vec![
1945            (col_c, "c_new".to_string()),
1946            (col_d, "d_new".to_string()),
1947            (&a_plus_b, "a+b".to_string()),
1948        ];
1949        let proj_exprs = proj_exprs
1950            .into_iter()
1951            .map(|(expr, name)| (Arc::clone(expr), name));
1952        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1953        let output_schema = output_schema(&projection_mapping, &schema)?;
1954
1955        let col_a_plus_b_new = &col("a+b", &output_schema)?;
1956        let col_c_new = &col("c_new", &output_schema)?;
1957        let col_d_new = &col("d_new", &output_schema)?;
1958
1959        let test_cases = vec![
1960            // ---------- TEST CASE 1 ------------
1961            (
1962                // orderings
1963                vec![
1964                    // [d ASC, b ASC]
1965                    vec![(col_d, option_asc), (col_b, option_asc)],
1966                    // [c ASC, a ASC]
1967                    vec![(col_c, option_asc), (col_a, option_asc)],
1968                ],
1969                // equal conditions
1970                vec![],
1971                // expected
1972                vec![
1973                    // [d_new ASC, c_new ASC, a+b ASC]
1974                    vec![
1975                        (col_d_new, option_asc),
1976                        (col_c_new, option_asc),
1977                        (col_a_plus_b_new, option_asc),
1978                    ],
1979                    // [c_new ASC, d_new ASC, a+b ASC]
1980                    vec![
1981                        (col_c_new, option_asc),
1982                        (col_d_new, option_asc),
1983                        (col_a_plus_b_new, option_asc),
1984                    ],
1985                ],
1986            ),
1987            // ---------- TEST CASE 2 ------------
1988            (
1989                // orderings
1990                vec![
1991                    // [d ASC, b ASC]
1992                    vec![(col_d, option_asc), (col_b, option_asc)],
1993                    // [c ASC, e ASC], Please note that a=e
1994                    vec![(col_c, option_asc), (col_e, option_asc)],
1995                ],
1996                // equal conditions
1997                vec![(col_e, col_a)],
1998                // expected
1999                vec![
2000                    // [d_new ASC, c_new ASC, a+b ASC]
2001                    vec![
2002                        (col_d_new, option_asc),
2003                        (col_c_new, option_asc),
2004                        (col_a_plus_b_new, option_asc),
2005                    ],
2006                    // [c_new ASC, d_new ASC, a+b ASC]
2007                    vec![
2008                        (col_c_new, option_asc),
2009                        (col_d_new, option_asc),
2010                        (col_a_plus_b_new, option_asc),
2011                    ],
2012                ],
2013            ),
2014            // ---------- TEST CASE 3 ------------
2015            (
2016                // orderings
2017                vec![
2018                    // [d ASC, b ASC]
2019                    vec![(col_d, option_asc), (col_b, option_asc)],
2020                    // [c ASC, e ASC], Please note that a=f
2021                    vec![(col_c, option_asc), (col_e, option_asc)],
2022                ],
2023                // equal conditions
2024                vec![(col_a, col_f)],
2025                // expected
2026                vec![
2027                    // [d_new ASC]
2028                    vec![(col_d_new, option_asc)],
2029                    // [c_new ASC]
2030                    vec![(col_c_new, option_asc)],
2031                ],
2032            ),
2033        ];
2034        for (orderings, equal_columns, expected) in test_cases {
2035            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
2036            for (lhs, rhs) in equal_columns {
2037                eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?;
2038            }
2039
2040            let orderings = convert_to_orderings(&orderings);
2041            eq_properties.add_orderings(orderings);
2042
2043            let expected = convert_to_orderings(&expected);
2044
2045            let projected_eq =
2046                eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
2047            let orderings = projected_eq.oeq_class();
2048
2049            let err_msg = format!(
2050                "actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
2051            );
2052
2053            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
2054            for expected_ordering in &expected {
2055                assert!(orderings.contains(expected_ordering), "{}", err_msg)
2056            }
2057        }
2058
2059        Ok(())
2060    }
2061
2062    fn get_stats() -> Statistics {
2063        Statistics {
2064            num_rows: Precision::Exact(5),
2065            total_byte_size: Precision::Exact(23),
2066            column_statistics: vec![
2067                ColumnStatistics {
2068                    distinct_count: Precision::Exact(5),
2069                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
2070                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
2071                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
2072                    null_count: Precision::Exact(0),
2073                    byte_size: Precision::Absent,
2074                },
2075                ColumnStatistics {
2076                    distinct_count: Precision::Exact(1),
2077                    max_value: Precision::Exact(ScalarValue::from("x")),
2078                    min_value: Precision::Exact(ScalarValue::from("a")),
2079                    sum_value: Precision::Absent,
2080                    null_count: Precision::Exact(3),
2081                    byte_size: Precision::Absent,
2082                },
2083                ColumnStatistics {
2084                    distinct_count: Precision::Absent,
2085                    max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
2086                    min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
2087                    sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
2088                    null_count: Precision::Absent,
2089                    byte_size: Precision::Absent,
2090                },
2091            ],
2092        }
2093    }
2094
2095    fn get_schema() -> Schema {
2096        let field_0 = Field::new("col0", DataType::Int64, false);
2097        let field_1 = Field::new("col1", DataType::Utf8, false);
2098        let field_2 = Field::new("col2", DataType::Float32, false);
2099        Schema::new(vec![field_0, field_1, field_2])
2100    }
2101
2102    #[test]
2103    fn test_stats_projection_columns_only() {
2104        let source = get_stats();
2105        let schema = get_schema();
2106
2107        let projection = ProjectionExprs::new(vec![
2108            ProjectionExpr {
2109                expr: Arc::new(Column::new("col1", 1)),
2110                alias: "col1".to_string(),
2111            },
2112            ProjectionExpr {
2113                expr: Arc::new(Column::new("col0", 0)),
2114                alias: "col0".to_string(),
2115            },
2116        ]);
2117
2118        let result = projection
2119            .project_statistics(source, &projection.project_schema(&schema).unwrap())
2120            .unwrap();
2121
2122        let expected = Statistics {
2123            num_rows: Precision::Exact(5),
2124            // Because there is a variable length Utf8 column we cannot calculate exact byte size after projection
2125            // Thus we set it to Inexact (originally it was Exact(23))
2126            total_byte_size: Precision::Inexact(23),
2127            column_statistics: vec![
2128                ColumnStatistics {
2129                    distinct_count: Precision::Exact(1),
2130                    max_value: Precision::Exact(ScalarValue::from("x")),
2131                    min_value: Precision::Exact(ScalarValue::from("a")),
2132                    sum_value: Precision::Absent,
2133                    null_count: Precision::Exact(3),
2134                    byte_size: Precision::Absent,
2135                },
2136                ColumnStatistics {
2137                    distinct_count: Precision::Exact(5),
2138                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
2139                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
2140                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
2141                    null_count: Precision::Exact(0),
2142                    byte_size: Precision::Absent,
2143                },
2144            ],
2145        };
2146
2147        assert_eq!(result, expected);
2148    }
2149
2150    #[test]
2151    fn test_stats_projection_column_with_primitive_width_only() {
2152        let source = get_stats();
2153        let schema = get_schema();
2154
2155        let projection = ProjectionExprs::new(vec![
2156            ProjectionExpr {
2157                expr: Arc::new(Column::new("col2", 2)),
2158                alias: "col2".to_string(),
2159            },
2160            ProjectionExpr {
2161                expr: Arc::new(Column::new("col0", 0)),
2162                alias: "col0".to_string(),
2163            },
2164        ]);
2165
2166        let result = projection
2167            .project_statistics(source, &projection.project_schema(&schema).unwrap())
2168            .unwrap();
2169
2170        let expected = Statistics {
2171            num_rows: Precision::Exact(5),
2172            total_byte_size: Precision::Exact(60),
2173            column_statistics: vec![
2174                ColumnStatistics {
2175                    distinct_count: Precision::Absent,
2176                    max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
2177                    min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
2178                    sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
2179                    null_count: Precision::Absent,
2180                    byte_size: Precision::Absent,
2181                },
2182                ColumnStatistics {
2183                    distinct_count: Precision::Exact(5),
2184                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
2185                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
2186                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
2187                    null_count: Precision::Exact(0),
2188                    byte_size: Precision::Absent,
2189                },
2190            ],
2191        };
2192
2193        assert_eq!(result, expected);
2194    }
2195
2196    // Tests for Projection struct
2197
2198    #[test]
2199    fn test_projection_new() -> Result<()> {
2200        let exprs = vec![
2201            ProjectionExpr {
2202                expr: Arc::new(Column::new("a", 0)),
2203                alias: "a".to_string(),
2204            },
2205            ProjectionExpr {
2206                expr: Arc::new(Column::new("b", 1)),
2207                alias: "b".to_string(),
2208            },
2209        ];
2210        let projection = ProjectionExprs::new(exprs.clone());
2211        assert_eq!(projection.as_ref().len(), 2);
2212        Ok(())
2213    }
2214
2215    #[test]
2216    fn test_projection_from_vec() -> Result<()> {
2217        let exprs = vec![ProjectionExpr {
2218            expr: Arc::new(Column::new("x", 0)),
2219            alias: "x".to_string(),
2220        }];
2221        let projection: ProjectionExprs = exprs.clone().into();
2222        assert_eq!(projection.as_ref().len(), 1);
2223        Ok(())
2224    }
2225
2226    #[test]
2227    fn test_projection_as_ref() -> Result<()> {
2228        let exprs = vec![
2229            ProjectionExpr {
2230                expr: Arc::new(Column::new("col1", 0)),
2231                alias: "col1".to_string(),
2232            },
2233            ProjectionExpr {
2234                expr: Arc::new(Column::new("col2", 1)),
2235                alias: "col2".to_string(),
2236            },
2237        ];
2238        let projection = ProjectionExprs::new(exprs);
2239        let as_ref: &[ProjectionExpr] = projection.as_ref();
2240        assert_eq!(as_ref.len(), 2);
2241        Ok(())
2242    }
2243
2244    #[test]
2245    fn test_column_indices_multiple_columns() -> Result<()> {
2246        // Test with reversed column order to ensure proper reordering
2247        let projection = ProjectionExprs::new(vec![
2248            ProjectionExpr {
2249                expr: Arc::new(Column::new("c", 5)),
2250                alias: "c".to_string(),
2251            },
2252            ProjectionExpr {
2253                expr: Arc::new(Column::new("b", 2)),
2254                alias: "b".to_string(),
2255            },
2256            ProjectionExpr {
2257                expr: Arc::new(Column::new("a", 0)),
2258                alias: "a".to_string(),
2259            },
2260        ]);
2261        // Should return sorted indices regardless of projection order
2262        assert_eq!(projection.column_indices(), vec![0, 2, 5]);
2263        Ok(())
2264    }
2265
2266    #[test]
2267    fn test_column_indices_duplicates() -> Result<()> {
2268        // Test that duplicate column indices appear only once
2269        let projection = ProjectionExprs::new(vec![
2270            ProjectionExpr {
2271                expr: Arc::new(Column::new("a", 1)),
2272                alias: "a".to_string(),
2273            },
2274            ProjectionExpr {
2275                expr: Arc::new(Column::new("b", 3)),
2276                alias: "b".to_string(),
2277            },
2278            ProjectionExpr {
2279                expr: Arc::new(Column::new("a2", 1)), // duplicate index
2280                alias: "a2".to_string(),
2281            },
2282        ]);
2283        assert_eq!(projection.column_indices(), vec![1, 3]);
2284        Ok(())
2285    }
2286
2287    #[test]
2288    fn test_column_indices_unsorted() -> Result<()> {
2289        // Test that column indices are sorted in the output
2290        let projection = ProjectionExprs::new(vec![
2291            ProjectionExpr {
2292                expr: Arc::new(Column::new("c", 5)),
2293                alias: "c".to_string(),
2294            },
2295            ProjectionExpr {
2296                expr: Arc::new(Column::new("a", 1)),
2297                alias: "a".to_string(),
2298            },
2299            ProjectionExpr {
2300                expr: Arc::new(Column::new("b", 3)),
2301                alias: "b".to_string(),
2302            },
2303        ]);
2304        assert_eq!(projection.column_indices(), vec![1, 3, 5]);
2305        Ok(())
2306    }
2307
2308    #[test]
2309    fn test_column_indices_complex_expr() -> Result<()> {
2310        // Test with complex expressions containing multiple columns
2311        let expr = Arc::new(BinaryExpr::new(
2312            Arc::new(Column::new("a", 1)),
2313            Operator::Plus,
2314            Arc::new(Column::new("b", 4)),
2315        ));
2316        let projection = ProjectionExprs::new(vec![
2317            ProjectionExpr {
2318                expr,
2319                alias: "sum".to_string(),
2320            },
2321            ProjectionExpr {
2322                expr: Arc::new(Column::new("c", 2)),
2323                alias: "c".to_string(),
2324            },
2325        ]);
2326        // Should return [1, 2, 4] - all columns used, sorted and deduplicated
2327        assert_eq!(projection.column_indices(), vec![1, 2, 4]);
2328        Ok(())
2329    }
2330
2331    #[test]
2332    fn test_column_indices_empty() -> Result<()> {
2333        let projection = ProjectionExprs::new(vec![]);
2334        assert_eq!(projection.column_indices(), Vec::<usize>::new());
2335        Ok(())
2336    }
2337
2338    #[test]
2339    fn test_merge_simple_columns() -> Result<()> {
2340        // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
2341        let base_projection = ProjectionExprs::new(vec![
2342            ProjectionExpr {
2343                expr: Arc::new(Column::new("c", 2)),
2344                alias: "x".to_string(),
2345            },
2346            ProjectionExpr {
2347                expr: Arc::new(Column::new("b", 1)),
2348                alias: "y".to_string(),
2349            },
2350            ProjectionExpr {
2351                expr: Arc::new(Column::new("a", 0)),
2352                alias: "z".to_string(),
2353            },
2354        ]);
2355
2356        // Second projection: SELECT y@1 AS col2, x@0 AS col1
2357        let top_projection = ProjectionExprs::new(vec![
2358            ProjectionExpr {
2359                expr: Arc::new(Column::new("y", 1)),
2360                alias: "col2".to_string(),
2361            },
2362            ProjectionExpr {
2363                expr: Arc::new(Column::new("x", 0)),
2364                alias: "col1".to_string(),
2365            },
2366        ]);
2367
2368        // Merge should produce: SELECT b@1 AS col2, c@2 AS col1
2369        let merged = base_projection.try_merge(&top_projection)?;
2370        assert_snapshot!(format!("{merged}"), @"Projection[b@1 AS col2, c@2 AS col1]");
2371
2372        Ok(())
2373    }
2374
2375    #[test]
2376    fn test_merge_with_expressions() -> Result<()> {
2377        // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
2378        let base_projection = ProjectionExprs::new(vec![
2379            ProjectionExpr {
2380                expr: Arc::new(Column::new("c", 2)),
2381                alias: "x".to_string(),
2382            },
2383            ProjectionExpr {
2384                expr: Arc::new(Column::new("b", 1)),
2385                alias: "y".to_string(),
2386            },
2387            ProjectionExpr {
2388                expr: Arc::new(Column::new("a", 0)),
2389                alias: "z".to_string(),
2390            },
2391        ]);
2392
2393        // Second projection: SELECT y@1 + z@2 AS c2, x@0 + 1 AS c1
2394        let top_projection = ProjectionExprs::new(vec![
2395            ProjectionExpr {
2396                expr: Arc::new(BinaryExpr::new(
2397                    Arc::new(Column::new("y", 1)),
2398                    Operator::Plus,
2399                    Arc::new(Column::new("z", 2)),
2400                )),
2401                alias: "c2".to_string(),
2402            },
2403            ProjectionExpr {
2404                expr: Arc::new(BinaryExpr::new(
2405                    Arc::new(Column::new("x", 0)),
2406                    Operator::Plus,
2407                    Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
2408                )),
2409                alias: "c1".to_string(),
2410            },
2411        ]);
2412
2413        // Merge should produce: SELECT b@1 + a@0 AS c2, c@2 + 1 AS c1
2414        let merged = base_projection.try_merge(&top_projection)?;
2415        assert_snapshot!(format!("{merged}"), @"Projection[b@1 + a@0 AS c2, c@2 + 1 AS c1]");
2416
2417        Ok(())
2418    }
2419
2420    #[test]
2421    fn try_merge_error() {
2422        // Create a base projection
2423        let base = ProjectionExprs::new(vec![
2424            ProjectionExpr {
2425                expr: Arc::new(Column::new("a", 0)),
2426                alias: "x".to_string(),
2427            },
2428            ProjectionExpr {
2429                expr: Arc::new(Column::new("b", 1)),
2430                alias: "y".to_string(),
2431            },
2432        ]);
2433
2434        // Create a top projection that references a non-existent column index
2435        let top = ProjectionExprs::new(vec![ProjectionExpr {
2436            expr: Arc::new(Column::new("z", 5)), // Invalid index
2437            alias: "result".to_string(),
2438        }]);
2439
2440        // Attempt to merge and expect an error
2441        let err_msg = base.try_merge(&top).unwrap_err().to_string();
2442        assert!(
2443            err_msg.contains("Internal error: Column index 5 out of bounds for projected expressions of length 2"),
2444            "Unexpected error message: {err_msg}",
2445        );
2446    }
2447
2448    #[test]
2449    fn test_merge_empty_projection_with_literal() -> Result<()> {
2450        // This test reproduces the issue from roundtrip_empty_projection test
2451        // Query like: SELECT 1 FROM table
2452        // where the file scan needs no columns (empty projection)
2453        // but we project a literal on top
2454
2455        // Empty base projection (no columns needed from file)
2456        let base_projection = ProjectionExprs::new(vec![]);
2457
2458        // Top projection with a literal expression: SELECT 1
2459        let top_projection = ProjectionExprs::new(vec![ProjectionExpr {
2460            expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2461            alias: "Int64(1)".to_string(),
2462        }]);
2463
2464        // This should succeed - literals don't reference columns so they should
2465        // pass through unchanged when merged with an empty projection
2466        let merged = base_projection.try_merge(&top_projection)?;
2467        assert_snapshot!(format!("{merged}"), @"Projection[1 AS Int64(1)]");
2468
2469        Ok(())
2470    }
2471
2472    #[test]
2473    fn test_update_expr_with_literal() -> Result<()> {
2474        // Test that update_expr correctly handles expressions without column references
2475        let literal_expr: Arc<dyn PhysicalExpr> =
2476            Arc::new(Literal::new(ScalarValue::Int64(Some(42))));
2477        let empty_projection: Vec<ProjectionExpr> = vec![];
2478
2479        // Updating a literal with an empty projection should return the literal unchanged
2480        let result = update_expr(&literal_expr, &empty_projection, true)?;
2481        assert!(result.is_some(), "Literal expression should be valid");
2482
2483        let result_expr = result.unwrap();
2484        assert_eq!(
2485            result_expr
2486                .as_any()
2487                .downcast_ref::<Literal>()
2488                .unwrap()
2489                .value(),
2490            &ScalarValue::Int64(Some(42))
2491        );
2492
2493        Ok(())
2494    }
2495
2496    #[test]
2497    fn test_update_expr_with_complex_literal_expr() -> Result<()> {
2498        // Test update_expr with an expression containing both literals and a column
2499        // This tests the case where we have: literal + column
2500        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
2501            Arc::new(Literal::new(ScalarValue::Int64(Some(10)))),
2502            Operator::Plus,
2503            Arc::new(Column::new("x", 0)),
2504        ));
2505
2506        // Base projection that maps column 0 to a different expression
2507        let base_projection = vec![ProjectionExpr {
2508            expr: Arc::new(Column::new("a", 5)),
2509            alias: "x".to_string(),
2510        }];
2511
2512        // The expression should be updated: 10 + x@0 becomes 10 + a@5
2513        let result = update_expr(&expr, &base_projection, true)?;
2514        assert!(result.is_some(), "Expression should be valid");
2515
2516        let result_expr = result.unwrap();
2517        let binary = result_expr
2518            .as_any()
2519            .downcast_ref::<BinaryExpr>()
2520            .expect("Should be a BinaryExpr");
2521
2522        // Left side should still be the literal
2523        assert!(binary.left().as_any().downcast_ref::<Literal>().is_some());
2524
2525        // Right side should be updated to reference column at index 5
2526        let right_col = binary
2527            .right()
2528            .as_any()
2529            .downcast_ref::<Column>()
2530            .expect("Right should be a Column");
2531        assert_eq!(right_col.index(), 5);
2532
2533        Ok(())
2534    }
2535
2536    #[test]
2537    fn test_project_schema_simple_columns() -> Result<()> {
2538        // Input schema: [col0: Int64, col1: Utf8, col2: Float32]
2539        let input_schema = get_schema();
2540
2541        // Projection: SELECT col2 AS c, col0 AS a
2542        let projection = ProjectionExprs::new(vec![
2543            ProjectionExpr {
2544                expr: Arc::new(Column::new("col2", 2)),
2545                alias: "c".to_string(),
2546            },
2547            ProjectionExpr {
2548                expr: Arc::new(Column::new("col0", 0)),
2549                alias: "a".to_string(),
2550            },
2551        ]);
2552
2553        let output_schema = projection.project_schema(&input_schema)?;
2554
2555        // Should have 2 fields
2556        assert_eq!(output_schema.fields().len(), 2);
2557
2558        // First field should be "c" with Float32 type
2559        assert_eq!(output_schema.field(0).name(), "c");
2560        assert_eq!(output_schema.field(0).data_type(), &DataType::Float32);
2561
2562        // Second field should be "a" with Int64 type
2563        assert_eq!(output_schema.field(1).name(), "a");
2564        assert_eq!(output_schema.field(1).data_type(), &DataType::Int64);
2565
2566        Ok(())
2567    }
2568
2569    #[test]
2570    fn test_project_schema_with_expressions() -> Result<()> {
2571        // Input schema: [col0: Int64, col1: Utf8, col2: Float32]
2572        let input_schema = get_schema();
2573
2574        // Projection: SELECT col0 + 1 AS incremented
2575        let projection = ProjectionExprs::new(vec![ProjectionExpr {
2576            expr: Arc::new(BinaryExpr::new(
2577                Arc::new(Column::new("col0", 0)),
2578                Operator::Plus,
2579                Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2580            )),
2581            alias: "incremented".to_string(),
2582        }]);
2583
2584        let output_schema = projection.project_schema(&input_schema)?;
2585
2586        // Should have 1 field
2587        assert_eq!(output_schema.fields().len(), 1);
2588
2589        // Field should be "incremented" with Int64 type
2590        assert_eq!(output_schema.field(0).name(), "incremented");
2591        assert_eq!(output_schema.field(0).data_type(), &DataType::Int64);
2592
2593        Ok(())
2594    }
2595
2596    #[test]
2597    fn test_project_schema_preserves_metadata() -> Result<()> {
2598        // Create schema with metadata
2599        let mut metadata = HashMap::new();
2600        metadata.insert("key".to_string(), "value".to_string());
2601        let field_with_metadata =
2602            Field::new("col0", DataType::Int64, false).with_metadata(metadata.clone());
2603        let input_schema = Schema::new(vec![
2604            field_with_metadata,
2605            Field::new("col1", DataType::Utf8, false),
2606        ]);
2607
2608        // Projection: SELECT col0 AS renamed
2609        let projection = ProjectionExprs::new(vec![ProjectionExpr {
2610            expr: Arc::new(Column::new("col0", 0)),
2611            alias: "renamed".to_string(),
2612        }]);
2613
2614        let output_schema = projection.project_schema(&input_schema)?;
2615
2616        // Should have 1 field
2617        assert_eq!(output_schema.fields().len(), 1);
2618
2619        // Field should be "renamed" with metadata preserved
2620        assert_eq!(output_schema.field(0).name(), "renamed");
2621        assert_eq!(output_schema.field(0).metadata(), &metadata);
2622
2623        Ok(())
2624    }
2625
2626    #[test]
2627    fn test_project_schema_empty() -> Result<()> {
2628        let input_schema = get_schema();
2629        let projection = ProjectionExprs::new(vec![]);
2630
2631        let output_schema = projection.project_schema(&input_schema)?;
2632
2633        assert_eq!(output_schema.fields().len(), 0);
2634
2635        Ok(())
2636    }
2637
2638    #[test]
2639    fn test_project_statistics_columns_only() -> Result<()> {
2640        let input_stats = get_stats();
2641        let input_schema = get_schema();
2642
2643        // Projection: SELECT col1 AS text, col0 AS num
2644        let projection = ProjectionExprs::new(vec![
2645            ProjectionExpr {
2646                expr: Arc::new(Column::new("col1", 1)),
2647                alias: "text".to_string(),
2648            },
2649            ProjectionExpr {
2650                expr: Arc::new(Column::new("col0", 0)),
2651                alias: "num".to_string(),
2652            },
2653        ]);
2654
2655        let output_stats = projection.project_statistics(
2656            input_stats,
2657            &projection.project_schema(&input_schema)?,
2658        )?;
2659
2660        // Row count should be preserved
2661        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2662
2663        // Should have 2 column statistics (reordered from input)
2664        assert_eq!(output_stats.column_statistics.len(), 2);
2665
2666        // First column (col1 from input)
2667        assert_eq!(
2668            output_stats.column_statistics[0].distinct_count,
2669            Precision::Exact(1)
2670        );
2671        assert_eq!(
2672            output_stats.column_statistics[0].max_value,
2673            Precision::Exact(ScalarValue::from("x"))
2674        );
2675
2676        // Second column (col0 from input)
2677        assert_eq!(
2678            output_stats.column_statistics[1].distinct_count,
2679            Precision::Exact(5)
2680        );
2681        assert_eq!(
2682            output_stats.column_statistics[1].max_value,
2683            Precision::Exact(ScalarValue::Int64(Some(21)))
2684        );
2685
2686        Ok(())
2687    }
2688
2689    #[test]
2690    fn test_project_statistics_with_expressions() -> Result<()> {
2691        let input_stats = get_stats();
2692        let input_schema = get_schema();
2693
2694        // Projection with expression: SELECT col0 + 1 AS incremented, col1 AS text
2695        let projection = ProjectionExprs::new(vec![
2696            ProjectionExpr {
2697                expr: Arc::new(BinaryExpr::new(
2698                    Arc::new(Column::new("col0", 0)),
2699                    Operator::Plus,
2700                    Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2701                )),
2702                alias: "incremented".to_string(),
2703            },
2704            ProjectionExpr {
2705                expr: Arc::new(Column::new("col1", 1)),
2706                alias: "text".to_string(),
2707            },
2708        ]);
2709
2710        let output_stats = projection.project_statistics(
2711            input_stats,
2712            &projection.project_schema(&input_schema)?,
2713        )?;
2714
2715        // Row count should be preserved
2716        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2717
2718        // Should have 2 column statistics
2719        assert_eq!(output_stats.column_statistics.len(), 2);
2720
2721        // First column (expression) should have unknown statistics
2722        assert_eq!(
2723            output_stats.column_statistics[0].distinct_count,
2724            Precision::Absent
2725        );
2726        assert_eq!(
2727            output_stats.column_statistics[0].max_value,
2728            Precision::Absent
2729        );
2730
2731        // Second column (col1) should preserve statistics
2732        assert_eq!(
2733            output_stats.column_statistics[1].distinct_count,
2734            Precision::Exact(1)
2735        );
2736
2737        Ok(())
2738    }
2739
2740    #[test]
2741    fn test_project_statistics_primitive_width_only() -> Result<()> {
2742        let input_stats = get_stats();
2743        let input_schema = get_schema();
2744
2745        // Projection with only primitive width columns: SELECT col2 AS f, col0 AS i
2746        let projection = ProjectionExprs::new(vec![
2747            ProjectionExpr {
2748                expr: Arc::new(Column::new("col2", 2)),
2749                alias: "f".to_string(),
2750            },
2751            ProjectionExpr {
2752                expr: Arc::new(Column::new("col0", 0)),
2753                alias: "i".to_string(),
2754            },
2755        ]);
2756
2757        let output_stats = projection.project_statistics(
2758            input_stats,
2759            &projection.project_schema(&input_schema)?,
2760        )?;
2761
2762        // Row count should be preserved
2763        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2764
2765        // Total byte size should be recalculated for primitive types
2766        // Float32 (4 bytes) + Int64 (8 bytes) = 12 bytes per row, 5 rows = 60 bytes
2767        assert_eq!(output_stats.total_byte_size, Precision::Exact(60));
2768
2769        // Should have 2 column statistics
2770        assert_eq!(output_stats.column_statistics.len(), 2);
2771
2772        Ok(())
2773    }
2774
2775    #[test]
2776    fn test_project_statistics_empty() -> Result<()> {
2777        let input_stats = get_stats();
2778        let input_schema = get_schema();
2779
2780        let projection = ProjectionExprs::new(vec![]);
2781
2782        let output_stats = projection.project_statistics(
2783            input_stats,
2784            &projection.project_schema(&input_schema)?,
2785        )?;
2786
2787        // Row count should be preserved
2788        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2789
2790        // Should have no column statistics
2791        assert_eq!(output_stats.column_statistics.len(), 0);
2792
2793        // Total byte size should be 0 for empty projection
2794        assert_eq!(output_stats.total_byte_size, Precision::Exact(0));
2795
2796        Ok(())
2797    }
2798
2799    // Test statistics calculation for non-null literal (numeric constant)
2800    #[test]
2801    fn test_project_statistics_with_literal() -> Result<()> {
2802        let input_stats = get_stats();
2803        let input_schema = get_schema();
2804
2805        // Projection with literal: SELECT 42 AS constant, col0 AS num
2806        let projection = ProjectionExprs::new(vec![
2807            ProjectionExpr {
2808                expr: Arc::new(Literal::new(ScalarValue::Int64(Some(42)))),
2809                alias: "constant".to_string(),
2810            },
2811            ProjectionExpr {
2812                expr: Arc::new(Column::new("col0", 0)),
2813                alias: "num".to_string(),
2814            },
2815        ]);
2816
2817        let output_stats = projection.project_statistics(
2818            input_stats,
2819            &projection.project_schema(&input_schema)?,
2820        )?;
2821
2822        // Row count should be preserved
2823        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2824
2825        // Should have 2 column statistics
2826        assert_eq!(output_stats.column_statistics.len(), 2);
2827
2828        // First column (literal 42) should have proper constant statistics
2829        assert_eq!(
2830            output_stats.column_statistics[0].min_value,
2831            Precision::Exact(ScalarValue::Int64(Some(42)))
2832        );
2833        assert_eq!(
2834            output_stats.column_statistics[0].max_value,
2835            Precision::Exact(ScalarValue::Int64(Some(42)))
2836        );
2837        assert_eq!(
2838            output_stats.column_statistics[0].distinct_count,
2839            Precision::Exact(1)
2840        );
2841        assert_eq!(
2842            output_stats.column_statistics[0].null_count,
2843            Precision::Exact(0)
2844        );
2845        // Int64 is 8 bytes, 5 rows = 40 bytes
2846        assert_eq!(
2847            output_stats.column_statistics[0].byte_size,
2848            Precision::Exact(40)
2849        );
2850        // For a constant column, sum_value = value * num_rows = 42 * 5 = 210
2851        assert_eq!(
2852            output_stats.column_statistics[0].sum_value,
2853            Precision::Exact(ScalarValue::Int64(Some(210)))
2854        );
2855
2856        // Second column (col0) should preserve statistics
2857        assert_eq!(
2858            output_stats.column_statistics[1].distinct_count,
2859            Precision::Exact(5)
2860        );
2861        assert_eq!(
2862            output_stats.column_statistics[1].max_value,
2863            Precision::Exact(ScalarValue::Int64(Some(21)))
2864        );
2865
2866        Ok(())
2867    }
2868
2869    // Test statistics calculation for NULL literal (constant NULL column)
2870    #[test]
2871    fn test_project_statistics_with_null_literal() -> Result<()> {
2872        let input_stats = get_stats();
2873        let input_schema = get_schema();
2874
2875        // Projection with NULL literal: SELECT NULL AS null_col, col0 AS num
2876        let projection = ProjectionExprs::new(vec![
2877            ProjectionExpr {
2878                expr: Arc::new(Literal::new(ScalarValue::Int64(None))),
2879                alias: "null_col".to_string(),
2880            },
2881            ProjectionExpr {
2882                expr: Arc::new(Column::new("col0", 0)),
2883                alias: "num".to_string(),
2884            },
2885        ]);
2886
2887        let output_stats = projection.project_statistics(
2888            input_stats,
2889            &projection.project_schema(&input_schema)?,
2890        )?;
2891
2892        // Row count should be preserved
2893        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2894
2895        // Should have 2 column statistics
2896        assert_eq!(output_stats.column_statistics.len(), 2);
2897
2898        // First column (NULL literal) should have proper constant NULL statistics
2899        assert_eq!(
2900            output_stats.column_statistics[0].min_value,
2901            Precision::Exact(ScalarValue::Int64(None))
2902        );
2903        assert_eq!(
2904            output_stats.column_statistics[0].max_value,
2905            Precision::Exact(ScalarValue::Int64(None))
2906        );
2907        assert_eq!(
2908            output_stats.column_statistics[0].distinct_count,
2909            Precision::Exact(1) // All NULLs are considered the same
2910        );
2911        assert_eq!(
2912            output_stats.column_statistics[0].null_count,
2913            Precision::Exact(5) // All rows are NULL
2914        );
2915        assert_eq!(
2916            output_stats.column_statistics[0].byte_size,
2917            Precision::Exact(0)
2918        );
2919        assert_eq!(
2920            output_stats.column_statistics[0].sum_value,
2921            Precision::Exact(ScalarValue::Int64(None))
2922        );
2923
2924        // Second column (col0) should preserve statistics
2925        assert_eq!(
2926            output_stats.column_statistics[1].distinct_count,
2927            Precision::Exact(5)
2928        );
2929        assert_eq!(
2930            output_stats.column_statistics[1].max_value,
2931            Precision::Exact(ScalarValue::Int64(Some(21)))
2932        );
2933
2934        Ok(())
2935    }
2936
2937    // Test statistics calculation for complex type literal (e.g., Utf8 string)
2938    #[test]
2939    fn test_project_statistics_with_complex_type_literal() -> Result<()> {
2940        let input_stats = get_stats();
2941        let input_schema = get_schema();
2942
2943        // Projection with Utf8 literal (complex type): SELECT 'hello' AS text, col0 AS num
2944        let projection = ProjectionExprs::new(vec![
2945            ProjectionExpr {
2946                expr: Arc::new(Literal::new(ScalarValue::Utf8(Some(
2947                    "hello".to_string(),
2948                )))),
2949                alias: "text".to_string(),
2950            },
2951            ProjectionExpr {
2952                expr: Arc::new(Column::new("col0", 0)),
2953                alias: "num".to_string(),
2954            },
2955        ]);
2956
2957        let output_stats = projection.project_statistics(
2958            input_stats,
2959            &projection.project_schema(&input_schema)?,
2960        )?;
2961
2962        // Row count should be preserved
2963        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2964
2965        // Should have 2 column statistics
2966        assert_eq!(output_stats.column_statistics.len(), 2);
2967
2968        // First column (Utf8 literal 'hello') should have proper constant statistics
2969        // but byte_size should be Absent for complex types
2970        assert_eq!(
2971            output_stats.column_statistics[0].min_value,
2972            Precision::Exact(ScalarValue::Utf8(Some("hello".to_string())))
2973        );
2974        assert_eq!(
2975            output_stats.column_statistics[0].max_value,
2976            Precision::Exact(ScalarValue::Utf8(Some("hello".to_string())))
2977        );
2978        assert_eq!(
2979            output_stats.column_statistics[0].distinct_count,
2980            Precision::Exact(1)
2981        );
2982        assert_eq!(
2983            output_stats.column_statistics[0].null_count,
2984            Precision::Exact(0)
2985        );
2986        // Complex types (Utf8, List, etc.) should have byte_size = Absent
2987        // because we can't calculate exact size without knowing the actual data
2988        assert_eq!(
2989            output_stats.column_statistics[0].byte_size,
2990            Precision::Absent
2991        );
2992        // Non-numeric types (Utf8) should have sum_value = Absent
2993        // because sum is only meaningful for numeric types
2994        assert_eq!(
2995            output_stats.column_statistics[0].sum_value,
2996            Precision::Absent
2997        );
2998
2999        // Second column (col0) should preserve statistics
3000        assert_eq!(
3001            output_stats.column_statistics[1].distinct_count,
3002            Precision::Exact(5)
3003        );
3004        assert_eq!(
3005            output_stats.column_statistics[1].max_value,
3006            Precision::Exact(ScalarValue::Int64(Some(21)))
3007        );
3008
3009        Ok(())
3010    }
3011}