datafusion_physical_expr/
projection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ProjectionExpr`] and [`ProjectionExprs`] for representing projections.
19
20use std::ops::Deref;
21use std::sync::Arc;
22
23use crate::PhysicalExpr;
24use crate::expressions::{Column, Literal};
25use crate::utils::collect_columns;
26
27use arrow::array::{RecordBatch, RecordBatchOptions};
28use arrow::datatypes::{Field, Schema, SchemaRef};
29use datafusion_common::stats::{ColumnStatistics, Precision};
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31use datafusion_common::{
32    Result, ScalarValue, assert_or_internal_err, internal_datafusion_err, plan_err,
33};
34
35use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet;
36use datafusion_physical_expr_common::metrics::ExpressionEvaluatorMetrics;
37use datafusion_physical_expr_common::physical_expr::fmt_sql;
38use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
39use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays_with_metrics;
40use indexmap::IndexMap;
41use itertools::Itertools;
42
43/// An expression used by projection operations.
44///
45/// The expression is evaluated and the result is stored in a column
46/// with the name specified by `alias`.
47///
48/// For example, the SQL expression `a + b AS sum_ab` would be represented
49/// as a `ProjectionExpr` where `expr` is the expression `a + b`
50/// and `alias` is the string `sum_ab`.
51///
52/// See [`ProjectionExprs`] for a collection of projection expressions.
53#[derive(Debug, Clone)]
54pub struct ProjectionExpr {
55    /// The expression that will be evaluated.
56    pub expr: Arc<dyn PhysicalExpr>,
57    /// The name of the output column for use an output schema.
58    pub alias: String,
59}
60
61impl PartialEq for ProjectionExpr {
62    fn eq(&self, other: &Self) -> bool {
63        let ProjectionExpr { expr, alias } = self;
64        expr.eq(&other.expr) && *alias == other.alias
65    }
66}
67
68impl Eq for ProjectionExpr {}
69
70impl std::fmt::Display for ProjectionExpr {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        if self.expr.to_string() == self.alias {
73            write!(f, "{}", self.alias)
74        } else {
75            write!(f, "{} AS {}", self.expr, self.alias)
76        }
77    }
78}
79
80impl ProjectionExpr {
81    /// Create a new projection expression
82    pub fn new(expr: Arc<dyn PhysicalExpr>, alias: impl Into<String>) -> Self {
83        let alias = alias.into();
84        Self { expr, alias }
85    }
86
87    /// Create a new projection expression from an expression and a schema using the expression's output field name as alias.
88    pub fn new_from_expression(
89        expr: Arc<dyn PhysicalExpr>,
90        schema: &Schema,
91    ) -> Result<Self> {
92        let field = expr.return_field(schema)?;
93        Ok(Self {
94            expr,
95            alias: field.name().to_string(),
96        })
97    }
98}
99
100impl From<(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
101    fn from(value: (Arc<dyn PhysicalExpr>, String)) -> Self {
102        Self::new(value.0, value.1)
103    }
104}
105
106impl From<&(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
107    fn from(value: &(Arc<dyn PhysicalExpr>, String)) -> Self {
108        Self::new(Arc::clone(&value.0), value.1.clone())
109    }
110}
111
112impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
113    fn from(value: ProjectionExpr) -> Self {
114        (value.expr, value.alias)
115    }
116}
117
118/// A collection of  [`ProjectionExpr`] instances, representing a complete
119/// projection operation.
120///
121/// Projection operations are used in query plans to select specific columns or
122/// compute new columns based on existing ones.
123///
124/// See [`ProjectionExprs::from_indices`] to select a subset of columns by
125/// indices.
126#[derive(Debug, Clone, PartialEq, Eq)]
127pub struct ProjectionExprs {
128    exprs: Vec<ProjectionExpr>,
129}
130
131impl std::fmt::Display for ProjectionExprs {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        let exprs: Vec<String> = self.exprs.iter().map(|e| e.to_string()).collect();
134        write!(f, "Projection[{}]", exprs.join(", "))
135    }
136}
137
138impl From<Vec<ProjectionExpr>> for ProjectionExprs {
139    fn from(value: Vec<ProjectionExpr>) -> Self {
140        Self { exprs: value }
141    }
142}
143
144impl From<&[ProjectionExpr]> for ProjectionExprs {
145    fn from(value: &[ProjectionExpr]) -> Self {
146        Self {
147            exprs: value.to_vec(),
148        }
149    }
150}
151
152impl FromIterator<ProjectionExpr> for ProjectionExprs {
153    fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
154        Self {
155            exprs: exprs.into_iter().collect::<Vec<_>>(),
156        }
157    }
158}
159
160impl AsRef<[ProjectionExpr]> for ProjectionExprs {
161    fn as_ref(&self) -> &[ProjectionExpr] {
162        &self.exprs
163    }
164}
165
166impl ProjectionExprs {
167    pub fn new<I>(exprs: I) -> Self
168    where
169        I: IntoIterator<Item = ProjectionExpr>,
170    {
171        Self {
172            exprs: exprs.into_iter().collect::<Vec<_>>(),
173        }
174    }
175
176    /// Creates a [`ProjectionExpr`] from a list of column indices.
177    ///
178    /// This is a convenience method for creating simple column-only projections, where each projection expression is a reference to a column
179    /// in the input schema.
180    ///
181    /// # Behavior
182    /// - Ordering: the output projection preserves the exact order of indices provided in the input slice
183    ///   For example, `[2, 0, 1]` will produce projections for columns 2, 0, then 1 in that order
184    /// - Duplicates: Duplicate indices are allowed and will create multiple projection expressions referencing the same source column
185    ///   For example, `[0, 0]` creates 2 separate projections both referencing column 0
186    ///
187    /// # Panics
188    /// Panics if any index in `indices` is out of bounds for the provided schema.
189    ///
190    /// # Example
191    ///
192    /// ```rust
193    /// use arrow::datatypes::{DataType, Field, Schema};
194    /// use datafusion_physical_expr::projection::ProjectionExprs;
195    /// use std::sync::Arc;
196    ///
197    /// // Create a schema with three columns
198    /// let schema = Arc::new(Schema::new(vec![
199    ///     Field::new("a", DataType::Int32, false),
200    ///     Field::new("b", DataType::Utf8, false),
201    ///     Field::new("c", DataType::Float64, false),
202    /// ]));
203    ///
204    /// // Project columns at indices 2 and 0 (c and a) - ordering is preserved
205    /// let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
206    ///
207    /// // This creates: SELECT c@2 AS c, a@0 AS a
208    /// assert_eq!(projection.as_ref().len(), 2);
209    /// assert_eq!(projection.as_ref()[0].alias, "c");
210    /// assert_eq!(projection.as_ref()[1].alias, "a");
211    ///
212    /// // Duplicate indices are allowed
213    /// let projection_with_dups = ProjectionExprs::from_indices(&[0, 0, 1], &schema);
214    /// assert_eq!(projection_with_dups.as_ref().len(), 3);
215    /// assert_eq!(projection_with_dups.as_ref()[0].alias, "a");
216    /// assert_eq!(projection_with_dups.as_ref()[1].alias, "a"); // duplicate
217    /// assert_eq!(projection_with_dups.as_ref()[2].alias, "b");
218    /// ```
219    pub fn from_indices(indices: &[usize], schema: &Schema) -> Self {
220        let projection_exprs = indices.iter().map(|&i| {
221            let field = schema.field(i);
222            ProjectionExpr {
223                expr: Arc::new(Column::new(field.name(), i)),
224                alias: field.name().clone(),
225            }
226        });
227
228        Self::from_iter(projection_exprs)
229    }
230
231    /// Returns an iterator over the projection expressions
232    pub fn iter(&self) -> impl Iterator<Item = &ProjectionExpr> {
233        self.exprs.iter()
234    }
235
236    /// Creates a ProjectionMapping from this projection
237    pub fn projection_mapping(
238        &self,
239        input_schema: &SchemaRef,
240    ) -> Result<ProjectionMapping> {
241        ProjectionMapping::try_new(
242            self.exprs
243                .iter()
244                .map(|p| (Arc::clone(&p.expr), p.alias.clone())),
245            input_schema,
246        )
247    }
248
249    /// Iterate over a clone of the projection expressions.
250    pub fn expr_iter(&self) -> impl Iterator<Item = Arc<dyn PhysicalExpr>> + '_ {
251        self.exprs.iter().map(|e| Arc::clone(&e.expr))
252    }
253
254    /// Apply a fallible transformation to the [`PhysicalExpr`] of each projection.
255    ///
256    /// This method transforms the expression in each [`ProjectionExpr`] while preserving
257    /// the alias. This is useful for rewriting expressions, such as when adapting
258    /// expressions to a different schema.
259    ///
260    /// # Example
261    ///
262    /// ```rust
263    /// use std::sync::Arc;
264    /// use arrow::datatypes::{DataType, Field, Schema};
265    /// use datafusion_common::Result;
266    /// use datafusion_physical_expr::expressions::Column;
267    /// use datafusion_physical_expr::projection::ProjectionExprs;
268    /// use datafusion_physical_expr::PhysicalExpr;
269    ///
270    /// // Create a schema and projection
271    /// let schema = Arc::new(Schema::new(vec![
272    ///     Field::new("a", DataType::Int32, false),
273    ///     Field::new("b", DataType::Int32, false),
274    /// ]));
275    /// let projection = ProjectionExprs::from_indices(&[0, 1], &schema);
276    ///
277    /// // Transform each expression (this example just clones them)
278    /// let transformed = projection.try_map_exprs(|expr| Ok(expr))?;
279    /// assert_eq!(transformed.as_ref().len(), 2);
280    /// # Ok::<(), datafusion_common::DataFusionError>(())
281    /// ```
282    pub fn try_map_exprs<F>(self, mut f: F) -> Result<Self>
283    where
284        F: FnMut(Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>,
285    {
286        let exprs = self
287            .exprs
288            .into_iter()
289            .map(|mut proj| {
290                proj.expr = f(proj.expr)?;
291                Ok(proj)
292            })
293            .collect::<Result<Vec<_>>>()?;
294        Ok(Self::new(exprs))
295    }
296
297    /// Apply another projection on top of this projection, returning the combined projection.
298    /// For example, if this projection is `SELECT c@2 AS x, b@1 AS y, a@0 as z` and the other projection is `SELECT x@0 + 1 AS c1, y@1 + z@2 as c2`,
299    /// we return a projection equivalent to `SELECT c@2 + 1 AS c1, b@1 + a@0 as c2`.
300    ///
301    /// # Example
302    ///
303    /// ```rust
304    /// use datafusion_common::{Result, ScalarValue};
305    /// use datafusion_expr::Operator;
306    /// use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
307    /// use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
308    /// use std::sync::Arc;
309    ///
310    /// fn main() -> Result<()> {
311    ///     // Example from the docstring:
312    ///     // Base projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
313    ///     let base = ProjectionExprs::new(vec![
314    ///         ProjectionExpr {
315    ///             expr: Arc::new(Column::new("c", 2)),
316    ///             alias: "x".to_string(),
317    ///         },
318    ///         ProjectionExpr {
319    ///             expr: Arc::new(Column::new("b", 1)),
320    ///             alias: "y".to_string(),
321    ///         },
322    ///         ProjectionExpr {
323    ///             expr: Arc::new(Column::new("a", 0)),
324    ///             alias: "z".to_string(),
325    ///         },
326    ///     ]);
327    ///
328    ///     // Top projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
329    ///     let top = ProjectionExprs::new(vec![
330    ///         ProjectionExpr {
331    ///             expr: Arc::new(BinaryExpr::new(
332    ///                 Arc::new(Column::new("x", 0)),
333    ///                 Operator::Plus,
334    ///                 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
335    ///             )),
336    ///             alias: "c1".to_string(),
337    ///         },
338    ///         ProjectionExpr {
339    ///             expr: Arc::new(BinaryExpr::new(
340    ///                 Arc::new(Column::new("y", 1)),
341    ///                 Operator::Plus,
342    ///                 Arc::new(Column::new("z", 2)),
343    ///             )),
344    ///             alias: "c2".to_string(),
345    ///         },
346    ///     ]);
347    ///
348    ///     // Expected result: SELECT c@2 + 1 AS c1, b@1 + a@0 AS c2
349    ///     let result = base.try_merge(&top)?;
350    ///
351    ///     assert_eq!(result.as_ref().len(), 2);
352    ///     assert_eq!(result.as_ref()[0].alias, "c1");
353    ///     assert_eq!(result.as_ref()[1].alias, "c2");
354    ///
355    ///     Ok(())
356    /// }
357    /// ```
358    ///
359    /// # Errors
360    /// This function returns an error if any expression in the `other` projection cannot be
361    /// applied on top of this projection.
362    pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
363        let mut new_exprs = Vec::with_capacity(other.exprs.len());
364        for proj_expr in &other.exprs {
365            let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
366                .ok_or_else(|| {
367                    internal_datafusion_err!(
368                        "Failed to combine projections: expression {} could not be applied on top of existing projections {}",
369                        proj_expr.expr,
370                        self.exprs.iter().map(|e| format!("{e}")).join(", ")
371                    )
372                })?;
373            new_exprs.push(ProjectionExpr {
374                expr: new_expr,
375                alias: proj_expr.alias.clone(),
376            });
377        }
378        Ok(ProjectionExprs::new(new_exprs))
379    }
380
381    /// Extract the column indices used in this projection.
382    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
383    /// this function would return `[0, 1]`.
384    /// Repeated indices are returned only once, and the order is ascending.
385    pub fn column_indices(&self) -> Vec<usize> {
386        self.exprs
387            .iter()
388            .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index()))
389            .sorted_unstable()
390            .dedup()
391            .collect_vec()
392    }
393
394    /// Extract the ordered column indices for a column-only projection.
395    ///
396    /// This function assumes that all expressions in the projection are simple column references.
397    /// It returns the column indices in the order they appear in the projection.
398    ///
399    /// # Panics
400    ///
401    /// Panics if any expression in the projection is not a simple column reference. This includes:
402    /// - Computed expressions (e.g., `a + 1`, `CAST(a AS INT)`)
403    /// - Function calls (e.g., `UPPER(name)`, `SUM(amount)`)
404    /// - Literals (e.g., `42`, `'hello'`)
405    /// - Complex nested expressions (e.g., `CASE WHEN ... THEN ... END`)
406    ///
407    /// # Returns
408    ///
409    /// A vector of column indices in projection order. Unlike [`column_indices()`](Self::column_indices),
410    /// this function:
411    /// - Preserves the projection order (does not sort)
412    /// - Preserves duplicates (does not deduplicate)
413    ///
414    /// # Example
415    ///
416    /// For a projection `SELECT c, a, c` where `a` is at index 0 and `c` is at index 2,
417    /// this function would return `[2, 0, 2]`.
418    ///
419    /// Use [`column_indices()`](Self::column_indices) instead if the projection may contain
420    /// non-column expressions or if you need a deduplicated sorted list.
421    ///
422    /// # Panics
423    ///
424    /// Panics if any expression in the projection is not a simple column reference.
425    #[deprecated(
426        since = "52.0.0",
427        note = "Use column_indices() instead. This method will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
428    )]
429    pub fn ordered_column_indices(&self) -> Vec<usize> {
430        self.exprs
431            .iter()
432            .map(|e| {
433                e.expr
434                    .as_any()
435                    .downcast_ref::<Column>()
436                    .expect("Expected column reference in projection")
437                    .index()
438            })
439            .collect()
440    }
441
442    /// Project a schema according to this projection.
443    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
444    /// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output schema would be `[x: Int32, y: Int32]`.
445    /// Fields' metadata are preserved from the input schema.
446    pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> {
447        let fields: Result<Vec<Field>> = self
448            .exprs
449            .iter()
450            .map(|proj_expr| {
451                let metadata = proj_expr
452                    .expr
453                    .return_field(input_schema)?
454                    .metadata()
455                    .clone();
456
457                let field = Field::new(
458                    &proj_expr.alias,
459                    proj_expr.expr.data_type(input_schema)?,
460                    proj_expr.expr.nullable(input_schema)?,
461                )
462                .with_metadata(metadata);
463
464                Ok(field)
465            })
466            .collect();
467
468        Ok(Schema::new_with_metadata(
469            fields?,
470            input_schema.metadata().clone(),
471        ))
472    }
473
474    /// Create a new [`Projector`] from this projection and an input schema.
475    ///
476    /// A [`Projector`] can be used to apply this projection to record batches.
477    ///
478    /// # Errors
479    /// This function returns an error if the output schema cannot be constructed from the input schema
480    /// with the given projection expressions.
481    /// For example, if an expression only works with integer columns but the input schema has a string column at that index.
482    pub fn make_projector(&self, input_schema: &Schema) -> Result<Projector> {
483        let output_schema = Arc::new(self.project_schema(input_schema)?);
484        Ok(Projector {
485            projection: self.clone(),
486            output_schema,
487            expression_metrics: None,
488        })
489    }
490
491    pub fn create_expression_metrics(
492        &self,
493        metrics: &ExecutionPlanMetricsSet,
494        partition: usize,
495    ) -> ExpressionEvaluatorMetrics {
496        let labels: Vec<String> = self
497            .exprs
498            .iter()
499            .map(|proj_expr| {
500                let expr_sql = fmt_sql(proj_expr.expr.as_ref()).to_string();
501                if proj_expr.expr.to_string() == proj_expr.alias {
502                    expr_sql
503                } else {
504                    format!("{expr_sql} AS {}", proj_expr.alias)
505                }
506            })
507            .collect();
508        ExpressionEvaluatorMetrics::new(metrics, partition, labels)
509    }
510
511    /// Project statistics according to this projection.
512    /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
513    /// if the input statistics has column statistics for columns `a`, `b`, and `c`, the output statistics would have column statistics for columns `x` and `y`.
514    ///
515    /// # Example
516    ///
517    /// ```rust
518    /// use arrow::datatypes::{DataType, Field, Schema};
519    /// use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
520    /// use datafusion_physical_expr::projection::ProjectionExprs;
521    /// use datafusion_common::Result;
522    /// use datafusion_common::ScalarValue;
523    /// use std::sync::Arc;
524    ///
525    /// fn main() -> Result<()> {
526    ///     // Input schema: a: Int32, b: Int32, c: Int32
527    ///     let input_schema = Arc::new(Schema::new(vec![
528    ///         Field::new("a", DataType::Int32, false),
529    ///         Field::new("b", DataType::Int32, false),
530    ///         Field::new("c", DataType::Int32, false),
531    ///     ]));
532    ///
533    ///     // Input statistics with column stats for a, b, c
534    ///     let input_stats = Statistics {
535    ///         num_rows: Precision::Exact(100),
536    ///         total_byte_size: Precision::Exact(1200),
537    ///         column_statistics: vec![
538    ///             // Column a stats
539    ///             ColumnStatistics::new_unknown()
540    ///                 .with_null_count(Precision::Exact(0))
541    ///                 .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
542    ///                 .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
543    ///                 .with_distinct_count(Precision::Exact(100)),
544    ///             // Column b stats
545    ///             ColumnStatistics::new_unknown()
546    ///                 .with_null_count(Precision::Exact(0))
547    ///                 .with_min_value(Precision::Exact(ScalarValue::Int32(Some(10))))
548    ///                 .with_max_value(Precision::Exact(ScalarValue::Int32(Some(60))))
549    ///                 .with_distinct_count(Precision::Exact(50)),
550    ///             // Column c stats
551    ///             ColumnStatistics::new_unknown()
552    ///                 .with_null_count(Precision::Exact(5))
553    ///                 .with_min_value(Precision::Exact(ScalarValue::Int32(Some(-10))))
554    ///                 .with_max_value(Precision::Exact(ScalarValue::Int32(Some(200))))
555    ///                 .with_distinct_count(Precision::Exact(25)),
556    ///         ],
557    ///     };
558    ///
559    ///     // Create a projection that selects columns c and a (indices 2 and 0)
560    ///     let projection = ProjectionExprs::from_indices(&[2, 0], &input_schema);
561    ///
562    ///     // Compute output schema
563    ///     let output_schema = projection.project_schema(&input_schema)?;
564    ///
565    ///     // Project the statistics
566    ///     let output_stats = projection.project_statistics(input_stats, &output_schema)?;
567    ///
568    ///     // The output should have 2 column statistics (for c and a, in that order)
569    ///     assert_eq!(output_stats.column_statistics.len(), 2);
570    ///
571    ///     // First column in output is c (was at index 2)
572    ///     assert_eq!(
573    ///         output_stats.column_statistics[0].min_value,
574    ///         Precision::Exact(ScalarValue::Int32(Some(-10)))
575    ///     );
576    ///     assert_eq!(
577    ///         output_stats.column_statistics[0].null_count,
578    ///         Precision::Exact(5)
579    ///     );
580    ///
581    ///     // Second column in output is a (was at index 0)
582    ///     assert_eq!(
583    ///         output_stats.column_statistics[1].min_value,
584    ///         Precision::Exact(ScalarValue::Int32(Some(0)))
585    ///     );
586    ///     assert_eq!(
587    ///         output_stats.column_statistics[1].distinct_count,
588    ///         Precision::Exact(100)
589    ///     );
590    ///
591    ///     // Total byte size is recalculated based on projected columns
592    ///     assert_eq!(
593    ///         output_stats.total_byte_size,
594    ///         Precision::Exact(800), // each Int32 column is 4 bytes * 100 rows * 2 columns
595    ///     );
596    ///
597    ///     // Number of rows remains the same
598    ///     assert_eq!(output_stats.num_rows, Precision::Exact(100));
599    ///
600    ///     Ok(())
601    /// }
602    /// ```
603    pub fn project_statistics(
604        &self,
605        mut stats: datafusion_common::Statistics,
606        output_schema: &Schema,
607    ) -> Result<datafusion_common::Statistics> {
608        let mut column_statistics = vec![];
609
610        for proj_expr in &self.exprs {
611            let expr = &proj_expr.expr;
612            let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
613                std::mem::take(&mut stats.column_statistics[col.index()])
614            } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
615                // Handle literal expressions (constants) by calculating proper statistics
616                let data_type = expr.data_type(output_schema)?;
617
618                if literal.value().is_null() {
619                    let null_count = match stats.num_rows {
620                        Precision::Exact(num_rows) => Precision::Exact(num_rows),
621                        _ => Precision::Absent,
622                    };
623
624                    ColumnStatistics {
625                        min_value: Precision::Exact(literal.value().clone()),
626                        max_value: Precision::Exact(literal.value().clone()),
627                        distinct_count: Precision::Exact(1),
628                        null_count,
629                        sum_value: Precision::Exact(literal.value().clone()),
630                        byte_size: Precision::Exact(0),
631                    }
632                } else {
633                    let value = literal.value();
634                    let distinct_count = Precision::Exact(1);
635                    let null_count = Precision::Exact(0);
636
637                    let byte_size = if let Some(byte_width) = data_type.primitive_width()
638                    {
639                        stats.num_rows.multiply(&Precision::Exact(byte_width))
640                    } else {
641                        // Complex types depend on array encoding, so set to Absent
642                        Precision::Absent
643                    };
644
645                    let sum_value = Precision::<ScalarValue>::from(stats.num_rows)
646                        .cast_to(&value.data_type())
647                        .ok()
648                        .map(|row_count| {
649                            Precision::Exact(value.clone()).multiply(&row_count)
650                        })
651                        .unwrap_or(Precision::Absent);
652
653                    ColumnStatistics {
654                        min_value: Precision::Exact(value.clone()),
655                        max_value: Precision::Exact(value.clone()),
656                        distinct_count,
657                        null_count,
658                        sum_value,
659                        byte_size,
660                    }
661                }
662            } else {
663                // TODO stats: estimate more statistics from expressions
664                // (expressions should compute their statistics themselves)
665                ColumnStatistics::new_unknown()
666            };
667            column_statistics.push(col_stats);
668        }
669        stats.calculate_total_byte_size(output_schema);
670        stats.column_statistics = column_statistics;
671        Ok(stats)
672    }
673}
674
675impl<'a> IntoIterator for &'a ProjectionExprs {
676    type Item = &'a ProjectionExpr;
677    type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
678
679    fn into_iter(self) -> Self::IntoIter {
680        self.exprs.iter()
681    }
682}
683
684/// Applies a projection to record batches.
685///
686/// A [`Projector`] uses a set of projection expressions to transform
687/// and a pre-computed output schema to project record batches accordingly.
688///
689/// The main reason to use a `Projector` is to avoid repeatedly computing
690/// the output schema for each batch, which can be costly if the projection
691/// expressions are complex.
692#[derive(Clone, Debug)]
693pub struct Projector {
694    projection: ProjectionExprs,
695    output_schema: SchemaRef,
696    /// If `Some`, metrics will be tracked for projection evaluation.
697    expression_metrics: Option<ExpressionEvaluatorMetrics>,
698}
699
700impl Projector {
701    /// Construct the projector with metrics. After execution, related metrics will
702    /// be tracked inside `ExecutionPlanMetricsSet`
703    ///
704    /// See [`ExpressionEvaluatorMetrics`] for details.
705    pub fn with_metrics(
706        &self,
707        metrics: &ExecutionPlanMetricsSet,
708        partition: usize,
709    ) -> Self {
710        let expr_metrics = self
711            .projection
712            .create_expression_metrics(metrics, partition);
713        Self {
714            expression_metrics: Some(expr_metrics),
715            projection: self.projection.clone(),
716            output_schema: Arc::clone(&self.output_schema),
717        }
718    }
719
720    /// Project a record batch according to this projector's expressions.
721    ///
722    /// # Errors
723    /// This function returns an error if any expression evaluation fails
724    /// or if the output schema of the resulting record batch does not match
725    /// the pre-computed output schema of the projector.
726    pub fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
727        let arrays = evaluate_expressions_to_arrays_with_metrics(
728            self.projection.exprs.iter().map(|p| &p.expr),
729            batch,
730            self.expression_metrics.as_ref(),
731        )?;
732
733        if arrays.is_empty() {
734            let options =
735                RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
736            RecordBatch::try_new_with_options(
737                Arc::clone(&self.output_schema),
738                arrays,
739                &options,
740            )
741            .map_err(Into::into)
742        } else {
743            RecordBatch::try_new(Arc::clone(&self.output_schema), arrays)
744                .map_err(Into::into)
745        }
746    }
747
748    pub fn output_schema(&self) -> &SchemaRef {
749        &self.output_schema
750    }
751
752    pub fn projection(&self) -> &ProjectionExprs {
753        &self.projection
754    }
755}
756
757impl IntoIterator for ProjectionExprs {
758    type Item = ProjectionExpr;
759    type IntoIter = std::vec::IntoIter<ProjectionExpr>;
760
761    fn into_iter(self) -> Self::IntoIter {
762        self.exprs.into_iter()
763    }
764}
765
766/// The function operates in two modes:
767///
768/// 1) When `sync_with_child` is `true`:
769///
770///    The function updates the indices of `expr` if the expression resides
771///    in the input plan. For instance, given the expressions `a@1 + b@2`
772///    and `c@0` with the input schema `c@2, a@0, b@1`, the expressions are
773///    updated to `a@0 + b@1` and `c@2`.
774///
775/// 2) When `sync_with_child` is `false`:
776///
777///    The function determines how the expression would be updated if a projection
778///    was placed before the plan associated with the expression. If the expression
779///    cannot be rewritten after the projection, it returns `None`. For example,
780///    given the expressions `c@0`, `a@1` and `b@2`, and the projection with
781///    an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes
782///    `a@0`, but `b@2` results in `None` since the projection does not include `b`.
783///
784/// # Errors
785/// This function returns an error if `sync_with_child` is `true` and if any expression references
786/// an index that is out of bounds for `projected_exprs`.
787/// For example:
788///
789/// - `expr` is `a@3`
790/// - `projected_exprs` is \[`a@0`, `b@1`\]
791///
792/// In this case, `a@3` references index 3, which is out of bounds for `projected_exprs` (which has length 2).
793pub fn update_expr(
794    expr: &Arc<dyn PhysicalExpr>,
795    projected_exprs: &[ProjectionExpr],
796    sync_with_child: bool,
797) -> Result<Option<Arc<dyn PhysicalExpr>>> {
798    #[derive(Debug, PartialEq)]
799    enum RewriteState {
800        /// The expression is unchanged.
801        Unchanged,
802        /// Some part of the expression has been rewritten
803        RewrittenValid,
804        /// Some part of the expression has been rewritten, but some column
805        /// references could not be.
806        RewrittenInvalid,
807    }
808
809    let mut state = RewriteState::Unchanged;
810
811    let new_expr = Arc::clone(expr)
812        .transform_up(|expr| {
813            if state == RewriteState::RewrittenInvalid {
814                return Ok(Transformed::no(expr));
815            }
816
817            let Some(column) = expr.as_any().downcast_ref::<Column>() else {
818                return Ok(Transformed::no(expr));
819            };
820            if sync_with_child {
821                state = RewriteState::RewrittenValid;
822                // Update the index of `column`:
823                let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {
824                    internal_datafusion_err!(
825                        "Column index {} out of bounds for projected expressions of length {}",
826                        column.index(),
827                        projected_exprs.len()
828                    )
829                })?;
830                Ok(Transformed::yes(Arc::clone(&projected_expr.expr)))
831            } else {
832                // default to invalid, in case we can't find the relevant column
833                state = RewriteState::RewrittenInvalid;
834                // Determine how to update `column` to accommodate `projected_exprs`
835                projected_exprs
836                    .iter()
837                    .enumerate()
838                    .find_map(|(index, proj_expr)| {
839                        proj_expr.expr.as_any().downcast_ref::<Column>().and_then(
840                            |projected_column| {
841                                (column.name().eq(projected_column.name())
842                                    && column.index() == projected_column.index())
843                                .then(|| {
844                                    state = RewriteState::RewrittenValid;
845                                    Arc::new(Column::new(&proj_expr.alias, index)) as _
846                                })
847                            },
848                        )
849                    })
850                    .map_or_else(
851                        || Ok(Transformed::no(expr)),
852                        |c| Ok(Transformed::yes(c)),
853                    )
854            }
855        })
856        .data()?;
857
858    match state {
859        RewriteState::RewrittenInvalid => Ok(None),
860        // Both Unchanged and RewrittenValid are valid:
861        // - Unchanged means no columns to rewrite (e.g., literals)
862        // - RewrittenValid means columns were successfully rewritten
863        RewriteState::Unchanged | RewriteState::RewrittenValid => Ok(Some(new_expr)),
864    }
865}
866
867/// Stores target expressions, along with their indices, that associate with a
868/// source expression in a projection mapping.
869#[derive(Clone, Debug, Default)]
870pub struct ProjectionTargets {
871    /// A non-empty vector of pairs of target expressions and their indices.
872    /// Consider using a special non-empty collection type in the future (e.g.
873    /// if Rust provides one in the standard library).
874    exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
875}
876
877impl ProjectionTargets {
878    /// Returns the first target expression and its index.
879    pub fn first(&self) -> &(Arc<dyn PhysicalExpr>, usize) {
880        // Since the vector is non-empty, we can safely unwrap:
881        self.exprs_indices.first().unwrap()
882    }
883
884    /// Adds a target expression and its index to the list of targets.
885    pub fn push(&mut self, target: (Arc<dyn PhysicalExpr>, usize)) {
886        self.exprs_indices.push(target);
887    }
888}
889
890impl Deref for ProjectionTargets {
891    type Target = [(Arc<dyn PhysicalExpr>, usize)];
892
893    fn deref(&self) -> &Self::Target {
894        &self.exprs_indices
895    }
896}
897
898impl From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets {
899    fn from(exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>) -> Self {
900        Self { exprs_indices }
901    }
902}
903
904/// Stores the mapping between source expressions and target expressions for a
905/// projection.
906#[derive(Clone, Debug)]
907pub struct ProjectionMapping {
908    /// Mapping between source expressions and target expressions.
909    /// Vector indices correspond to the indices after projection.
910    map: IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>,
911}
912
913impl ProjectionMapping {
914    /// Constructs the mapping between a projection's input and output
915    /// expressions.
916    ///
917    /// For example, given the input projection expressions (`a + b`, `c + d`)
918    /// and an output schema with two columns `"c + d"` and `"a + b"`, the
919    /// projection mapping would be:
920    ///
921    /// ```text
922    ///  [0]: (c + d, [(col("c + d"), 0)])
923    ///  [1]: (a + b, [(col("a + b"), 1)])
924    /// ```
925    ///
926    /// where `col("c + d")` means the column named `"c + d"`.
927    pub fn try_new(
928        expr: impl IntoIterator<Item = (Arc<dyn PhysicalExpr>, String)>,
929        input_schema: &SchemaRef,
930    ) -> Result<Self> {
931        // Construct a map from the input expressions to the output expression of the projection:
932        let mut map = IndexMap::<_, ProjectionTargets>::new();
933        for (expr_idx, (expr, name)) in expr.into_iter().enumerate() {
934            let target_expr = Arc::new(Column::new(&name, expr_idx)) as _;
935            let source_expr = expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
936                Some(col) => {
937                    // Sometimes, an expression and its name in the input_schema
938                    // doesn't match. This can cause problems, so we make sure
939                    // that the expression name matches with the name in `input_schema`.
940                    // Conceptually, `source_expr` and `expression` should be the same.
941                    let idx = col.index();
942                    let matching_field = input_schema.field(idx);
943                    let matching_name = matching_field.name();
944                    assert_or_internal_err!(
945                        col.name() == matching_name,
946                        "Input field name {matching_name} does not match with the projection expression {}",
947                        col.name()
948                    );
949                    let matching_column = Column::new(matching_name, idx);
950                    Ok(Transformed::yes(Arc::new(matching_column)))
951                }
952                None => Ok(Transformed::no(e)),
953            })
954            .data()?;
955            map.entry(source_expr)
956                .or_default()
957                .push((target_expr, expr_idx));
958        }
959        Ok(Self { map })
960    }
961
962    /// Constructs a subset mapping using the provided indices.
963    ///
964    /// This is used when the output is a subset of the input without any
965    /// other transformations. The indices are for columns in the schema.
966    pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Result<Self> {
967        let projection_exprs = indices.iter().map(|index| {
968            let field = schema.field(*index);
969            let column = Arc::new(Column::new(field.name(), *index));
970            (column as _, field.name().clone())
971        });
972        ProjectionMapping::try_new(projection_exprs, schema)
973    }
974}
975
976impl Deref for ProjectionMapping {
977    type Target = IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>;
978
979    fn deref(&self) -> &Self::Target {
980        &self.map
981    }
982}
983
984impl FromIterator<(Arc<dyn PhysicalExpr>, ProjectionTargets)> for ProjectionMapping {
985    fn from_iter<T: IntoIterator<Item = (Arc<dyn PhysicalExpr>, ProjectionTargets)>>(
986        iter: T,
987    ) -> Self {
988        Self {
989            map: IndexMap::from_iter(iter),
990        }
991    }
992}
993
994/// Projects a slice of [LexOrdering]s onto the given schema.
995///
996/// This is a convenience wrapper that applies [project_ordering] to each
997/// input ordering and collects the successful projections:
998/// - For each input ordering, the result of [project_ordering] is appended to
999///   the output if it is `Some(...)`.
1000/// - Order is preserved and no deduplication is attempted.
1001/// - If none of the input orderings can be projected, an empty `Vec` is
1002///   returned.
1003///
1004/// See [project_ordering] for the semantics of projecting a single
1005/// [LexOrdering].
1006pub fn project_orderings(
1007    orderings: &[LexOrdering],
1008    schema: &SchemaRef,
1009) -> Vec<LexOrdering> {
1010    let mut projected_orderings = vec![];
1011
1012    for ordering in orderings {
1013        projected_orderings.extend(project_ordering(ordering, schema));
1014    }
1015
1016    projected_orderings
1017}
1018
1019/// Projects a single [LexOrdering] onto the given schema.
1020///
1021/// This function attempts to rewrite every [PhysicalSortExpr] in the provided
1022/// [LexOrdering] so that any [Column] expressions point at the correct field
1023/// indices in `schema`.
1024///
1025/// Key details:
1026/// - Columns are matched by name, not by index. The index of each matched
1027///   column is looked up with [Schema::column_with_name](arrow::datatypes::Schema::column_with_name) and a new
1028///   [Column] with the correct [index](Column::index) is substituted.
1029/// - If an expression references a column name that does not exist in
1030///   `schema`, projection of the current ordering stops and only the already
1031///   rewritten prefix is kept. This models the fact that a lexicographical
1032///   ordering remains valid for any leading prefix whose expressions are
1033///   present in the projected schema.
1034/// - If no expressions can be projected (i.e. the first one is missing), the
1035///   function returns `None`.
1036///
1037/// Return value:
1038/// - `Some(LexOrdering)` if at least one sort expression could be projected.
1039///   The returned ordering may be a strict prefix of the input ordering.
1040/// - `None` if no part of the ordering can be projected onto `schema`.
1041///
1042/// Example
1043///
1044/// Suppose we have an input ordering `[col("a@0"), col("b@1")]` but the projected
1045/// schema only contains b and not a. The result will be `Some([col("a@0")])`. In other
1046/// words, the column reference is reindexed to match the projected schema.
1047/// If neither a nor b is present, the result will be None.
1048pub fn project_ordering(
1049    ordering: &LexOrdering,
1050    schema: &SchemaRef,
1051) -> Option<LexOrdering> {
1052    let mut projected_exprs = vec![];
1053    for PhysicalSortExpr { expr, options } in ordering.iter() {
1054        let transformed = Arc::clone(expr).transform_up(|expr| {
1055            let Some(col) = expr.as_any().downcast_ref::<Column>() else {
1056                return Ok(Transformed::no(expr));
1057            };
1058
1059            let name = col.name();
1060            if let Some((idx, _)) = schema.column_with_name(name) {
1061                // Compute the new column expression (with correct index) after projection:
1062                Ok(Transformed::yes(Arc::new(Column::new(name, idx))))
1063            } else {
1064                // Cannot find expression in the projected_schema,
1065                // signal this using an Err result
1066                plan_err!("")
1067            }
1068        });
1069
1070        match transformed {
1071            Ok(transformed) => {
1072                projected_exprs.push(PhysicalSortExpr::new(transformed.data, *options));
1073            }
1074            Err(_) => {
1075                // Err result indicates an expression could not be found in the
1076                // projected_schema, stop iterating since rest of the orderings are violated
1077                break;
1078            }
1079        }
1080    }
1081
1082    LexOrdering::new(projected_exprs)
1083}
1084
1085#[cfg(test)]
1086pub(crate) mod tests {
1087    use std::collections::HashMap;
1088
1089    use super::*;
1090    use crate::equivalence::{EquivalenceProperties, convert_to_orderings};
1091    use crate::expressions::{BinaryExpr, Literal, col};
1092    use crate::utils::tests::TestScalarUDF;
1093    use crate::{PhysicalExprRef, ScalarFunctionExpr};
1094
1095    use arrow::compute::SortOptions;
1096    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1097    use datafusion_common::config::ConfigOptions;
1098    use datafusion_common::stats::Precision;
1099    use datafusion_common::{ScalarValue, Statistics};
1100    use datafusion_expr::{Operator, ScalarUDF};
1101    use insta::assert_snapshot;
1102
1103    pub(crate) fn output_schema(
1104        mapping: &ProjectionMapping,
1105        input_schema: &Arc<Schema>,
1106    ) -> Result<SchemaRef> {
1107        // Calculate output schema:
1108        let mut fields = vec![];
1109        for (source, targets) in mapping.iter() {
1110            let data_type = source.data_type(input_schema)?;
1111            let nullable = source.nullable(input_schema)?;
1112            for (target, _) in targets.iter() {
1113                let Some(column) = target.as_any().downcast_ref::<Column>() else {
1114                    return plan_err!("Expects to have column");
1115                };
1116                fields.push(Field::new(column.name(), data_type.clone(), nullable));
1117            }
1118        }
1119
1120        let output_schema = Arc::new(Schema::new_with_metadata(
1121            fields,
1122            input_schema.metadata().clone(),
1123        ));
1124
1125        Ok(output_schema)
1126    }
1127
1128    #[test]
1129    fn project_orderings() -> Result<()> {
1130        let schema = Arc::new(Schema::new(vec![
1131            Field::new("a", DataType::Int32, true),
1132            Field::new("b", DataType::Int32, true),
1133            Field::new("c", DataType::Int32, true),
1134            Field::new("d", DataType::Int32, true),
1135            Field::new("e", DataType::Int32, true),
1136            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1137        ]));
1138        let col_a = &col("a", &schema)?;
1139        let col_b = &col("b", &schema)?;
1140        let col_c = &col("c", &schema)?;
1141        let col_d = &col("d", &schema)?;
1142        let col_e = &col("e", &schema)?;
1143        let col_ts = &col("ts", &schema)?;
1144        let a_plus_b = Arc::new(BinaryExpr::new(
1145            Arc::clone(col_a),
1146            Operator::Plus,
1147            Arc::clone(col_b),
1148        )) as Arc<dyn PhysicalExpr>;
1149        let b_plus_d = Arc::new(BinaryExpr::new(
1150            Arc::clone(col_b),
1151            Operator::Plus,
1152            Arc::clone(col_d),
1153        )) as Arc<dyn PhysicalExpr>;
1154        let b_plus_e = Arc::new(BinaryExpr::new(
1155            Arc::clone(col_b),
1156            Operator::Plus,
1157            Arc::clone(col_e),
1158        )) as Arc<dyn PhysicalExpr>;
1159        let c_plus_d = Arc::new(BinaryExpr::new(
1160            Arc::clone(col_c),
1161            Operator::Plus,
1162            Arc::clone(col_d),
1163        )) as Arc<dyn PhysicalExpr>;
1164
1165        let option_asc = SortOptions {
1166            descending: false,
1167            nulls_first: false,
1168        };
1169        let option_desc = SortOptions {
1170            descending: true,
1171            nulls_first: true,
1172        };
1173
1174        let test_cases = vec![
1175            // ---------- TEST CASE 1 ------------
1176            (
1177                // orderings
1178                vec![
1179                    // [b ASC]
1180                    vec![(col_b, option_asc)],
1181                ],
1182                // projection exprs
1183                vec![(col_b, "b_new".to_string()), (col_a, "a_new".to_string())],
1184                // expected
1185                vec![
1186                    // [b_new ASC]
1187                    vec![("b_new", option_asc)],
1188                ],
1189            ),
1190            // ---------- TEST CASE 2 ------------
1191            (
1192                // orderings
1193                vec![
1194                    // empty ordering
1195                ],
1196                // projection exprs
1197                vec![(col_c, "c_new".to_string()), (col_b, "b_new".to_string())],
1198                // expected
1199                vec![
1200                    // no ordering at the output
1201                ],
1202            ),
1203            // ---------- TEST CASE 3 ------------
1204            (
1205                // orderings
1206                vec![
1207                    // [ts ASC]
1208                    vec![(col_ts, option_asc)],
1209                ],
1210                // projection exprs
1211                vec![
1212                    (col_b, "b_new".to_string()),
1213                    (col_a, "a_new".to_string()),
1214                    (col_ts, "ts_new".to_string()),
1215                ],
1216                // expected
1217                vec![
1218                    // [ts_new ASC]
1219                    vec![("ts_new", option_asc)],
1220                ],
1221            ),
1222            // ---------- TEST CASE 4 ------------
1223            (
1224                // orderings
1225                vec![
1226                    // [a ASC, ts ASC]
1227                    vec![(col_a, option_asc), (col_ts, option_asc)],
1228                    // [b ASC, ts ASC]
1229                    vec![(col_b, option_asc), (col_ts, option_asc)],
1230                ],
1231                // projection exprs
1232                vec![
1233                    (col_b, "b_new".to_string()),
1234                    (col_a, "a_new".to_string()),
1235                    (col_ts, "ts_new".to_string()),
1236                ],
1237                // expected
1238                vec![
1239                    // [a_new ASC, ts_new ASC]
1240                    vec![("a_new", option_asc), ("ts_new", option_asc)],
1241                    // [b_new ASC, ts_new ASC]
1242                    vec![("b_new", option_asc), ("ts_new", option_asc)],
1243                ],
1244            ),
1245            // ---------- TEST CASE 5 ------------
1246            (
1247                // orderings
1248                vec![
1249                    // [a + b ASC]
1250                    vec![(&a_plus_b, option_asc)],
1251                ],
1252                // projection exprs
1253                vec![
1254                    (col_b, "b_new".to_string()),
1255                    (col_a, "a_new".to_string()),
1256                    (&a_plus_b, "a+b".to_string()),
1257                ],
1258                // expected
1259                vec![
1260                    // [a + b ASC]
1261                    vec![("a+b", option_asc)],
1262                ],
1263            ),
1264            // ---------- TEST CASE 6 ------------
1265            (
1266                // orderings
1267                vec![
1268                    // [a + b ASC, c ASC]
1269                    vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1270                ],
1271                // projection exprs
1272                vec![
1273                    (col_b, "b_new".to_string()),
1274                    (col_a, "a_new".to_string()),
1275                    (col_c, "c_new".to_string()),
1276                    (&a_plus_b, "a+b".to_string()),
1277                ],
1278                // expected
1279                vec![
1280                    // [a + b ASC, c_new ASC]
1281                    vec![("a+b", option_asc), ("c_new", option_asc)],
1282                ],
1283            ),
1284            // ------- TEST CASE 7 ----------
1285            (
1286                vec![
1287                    // [a ASC, b ASC, c ASC]
1288                    vec![(col_a, option_asc), (col_b, option_asc)],
1289                    // [a ASC, d ASC]
1290                    vec![(col_a, option_asc), (col_d, option_asc)],
1291                ],
1292                // b as b_new, a as a_new, d as d_new b+d
1293                vec![
1294                    (col_b, "b_new".to_string()),
1295                    (col_a, "a_new".to_string()),
1296                    (col_d, "d_new".to_string()),
1297                    (&b_plus_d, "b+d".to_string()),
1298                ],
1299                // expected
1300                vec![
1301                    // [a_new ASC, b_new ASC]
1302                    vec![("a_new", option_asc), ("b_new", option_asc)],
1303                    // [a_new ASC, d_new ASC]
1304                    vec![("a_new", option_asc), ("d_new", option_asc)],
1305                    // [a_new ASC, b+d ASC]
1306                    vec![("a_new", option_asc), ("b+d", option_asc)],
1307                ],
1308            ),
1309            // ------- TEST CASE 8 ----------
1310            (
1311                // orderings
1312                vec![
1313                    // [b+d ASC]
1314                    vec![(&b_plus_d, option_asc)],
1315                ],
1316                // proj exprs
1317                vec![
1318                    (col_b, "b_new".to_string()),
1319                    (col_a, "a_new".to_string()),
1320                    (col_d, "d_new".to_string()),
1321                    (&b_plus_d, "b+d".to_string()),
1322                ],
1323                // expected
1324                vec![
1325                    // [b+d ASC]
1326                    vec![("b+d", option_asc)],
1327                ],
1328            ),
1329            // ------- TEST CASE 9 ----------
1330            (
1331                // orderings
1332                vec![
1333                    // [a ASC, d ASC, b ASC]
1334                    vec![
1335                        (col_a, option_asc),
1336                        (col_d, option_asc),
1337                        (col_b, option_asc),
1338                    ],
1339                    // [c ASC]
1340                    vec![(col_c, option_asc)],
1341                ],
1342                // proj exprs
1343                vec![
1344                    (col_b, "b_new".to_string()),
1345                    (col_a, "a_new".to_string()),
1346                    (col_d, "d_new".to_string()),
1347                    (col_c, "c_new".to_string()),
1348                ],
1349                // expected
1350                vec![
1351                    // [a_new ASC, d_new ASC, b_new ASC]
1352                    vec![
1353                        ("a_new", option_asc),
1354                        ("d_new", option_asc),
1355                        ("b_new", option_asc),
1356                    ],
1357                    // [c_new ASC],
1358                    vec![("c_new", option_asc)],
1359                ],
1360            ),
1361            // ------- TEST CASE 10 ----------
1362            (
1363                vec![
1364                    // [a ASC, b ASC, c ASC]
1365                    vec![
1366                        (col_a, option_asc),
1367                        (col_b, option_asc),
1368                        (col_c, option_asc),
1369                    ],
1370                    // [a ASC, d ASC]
1371                    vec![(col_a, option_asc), (col_d, option_asc)],
1372                ],
1373                // proj exprs
1374                vec![
1375                    (col_b, "b_new".to_string()),
1376                    (col_a, "a_new".to_string()),
1377                    (col_c, "c_new".to_string()),
1378                    (&c_plus_d, "c+d".to_string()),
1379                ],
1380                // expected
1381                vec![
1382                    // [a_new ASC, b_new ASC, c_new ASC]
1383                    vec![
1384                        ("a_new", option_asc),
1385                        ("b_new", option_asc),
1386                        ("c_new", option_asc),
1387                    ],
1388                    // [a_new ASC, b_new ASC, c+d ASC]
1389                    vec![
1390                        ("a_new", option_asc),
1391                        ("b_new", option_asc),
1392                        ("c+d", option_asc),
1393                    ],
1394                ],
1395            ),
1396            // ------- TEST CASE 11 ----------
1397            (
1398                // orderings
1399                vec![
1400                    // [a ASC, b ASC]
1401                    vec![(col_a, option_asc), (col_b, option_asc)],
1402                    // [a ASC, d ASC]
1403                    vec![(col_a, option_asc), (col_d, option_asc)],
1404                ],
1405                // proj exprs
1406                vec![
1407                    (col_b, "b_new".to_string()),
1408                    (col_a, "a_new".to_string()),
1409                    (&b_plus_d, "b+d".to_string()),
1410                ],
1411                // expected
1412                vec![
1413                    // [a_new ASC, b_new ASC]
1414                    vec![("a_new", option_asc), ("b_new", option_asc)],
1415                    // [a_new ASC, b + d ASC]
1416                    vec![("a_new", option_asc), ("b+d", option_asc)],
1417                ],
1418            ),
1419            // ------- TEST CASE 12 ----------
1420            (
1421                // orderings
1422                vec![
1423                    // [a ASC, b ASC, c ASC]
1424                    vec![
1425                        (col_a, option_asc),
1426                        (col_b, option_asc),
1427                        (col_c, option_asc),
1428                    ],
1429                ],
1430                // proj exprs
1431                vec![(col_c, "c_new".to_string()), (col_a, "a_new".to_string())],
1432                // expected
1433                vec![
1434                    // [a_new ASC]
1435                    vec![("a_new", option_asc)],
1436                ],
1437            ),
1438            // ------- TEST CASE 13 ----------
1439            (
1440                // orderings
1441                vec![
1442                    // [a ASC, b ASC, c ASC]
1443                    vec![
1444                        (col_a, option_asc),
1445                        (col_b, option_asc),
1446                        (col_c, option_asc),
1447                    ],
1448                    // [a ASC, a + b ASC, c ASC]
1449                    vec![
1450                        (col_a, option_asc),
1451                        (&a_plus_b, option_asc),
1452                        (col_c, option_asc),
1453                    ],
1454                ],
1455                // proj exprs
1456                vec![
1457                    (col_c, "c_new".to_string()),
1458                    (col_b, "b_new".to_string()),
1459                    (col_a, "a_new".to_string()),
1460                    (&a_plus_b, "a+b".to_string()),
1461                ],
1462                // expected
1463                vec![
1464                    // [a_new ASC, b_new ASC, c_new ASC]
1465                    vec![
1466                        ("a_new", option_asc),
1467                        ("b_new", option_asc),
1468                        ("c_new", option_asc),
1469                    ],
1470                    // [a_new ASC, a+b ASC, c_new ASC]
1471                    vec![
1472                        ("a_new", option_asc),
1473                        ("a+b", option_asc),
1474                        ("c_new", option_asc),
1475                    ],
1476                ],
1477            ),
1478            // ------- TEST CASE 14 ----------
1479            (
1480                // orderings
1481                vec![
1482                    // [a ASC, b ASC]
1483                    vec![(col_a, option_asc), (col_b, option_asc)],
1484                    // [c ASC, b ASC]
1485                    vec![(col_c, option_asc), (col_b, option_asc)],
1486                    // [d ASC, e ASC]
1487                    vec![(col_d, option_asc), (col_e, option_asc)],
1488                ],
1489                // proj exprs
1490                vec![
1491                    (col_c, "c_new".to_string()),
1492                    (col_d, "d_new".to_string()),
1493                    (col_a, "a_new".to_string()),
1494                    (&b_plus_e, "b+e".to_string()),
1495                ],
1496                // expected
1497                vec![
1498                    // [a_new ASC, d_new ASC, b+e ASC]
1499                    vec![
1500                        ("a_new", option_asc),
1501                        ("d_new", option_asc),
1502                        ("b+e", option_asc),
1503                    ],
1504                    // [d_new ASC, a_new ASC, b+e ASC]
1505                    vec![
1506                        ("d_new", option_asc),
1507                        ("a_new", option_asc),
1508                        ("b+e", option_asc),
1509                    ],
1510                    // [c_new ASC, d_new ASC, b+e ASC]
1511                    vec![
1512                        ("c_new", option_asc),
1513                        ("d_new", option_asc),
1514                        ("b+e", option_asc),
1515                    ],
1516                    // [d_new ASC, c_new ASC, b+e ASC]
1517                    vec![
1518                        ("d_new", option_asc),
1519                        ("c_new", option_asc),
1520                        ("b+e", option_asc),
1521                    ],
1522                ],
1523            ),
1524            // ------- TEST CASE 15 ----------
1525            (
1526                // orderings
1527                vec![
1528                    // [a ASC, c ASC, b ASC]
1529                    vec![
1530                        (col_a, option_asc),
1531                        (col_c, option_asc),
1532                        (col_b, option_asc),
1533                    ],
1534                ],
1535                // proj exprs
1536                vec![
1537                    (col_c, "c_new".to_string()),
1538                    (col_a, "a_new".to_string()),
1539                    (&a_plus_b, "a+b".to_string()),
1540                ],
1541                // expected
1542                vec![
1543                    // [a_new ASC, d_new ASC, b+e ASC]
1544                    vec![
1545                        ("a_new", option_asc),
1546                        ("c_new", option_asc),
1547                        ("a+b", option_asc),
1548                    ],
1549                ],
1550            ),
1551            // ------- TEST CASE 16 ----------
1552            (
1553                // orderings
1554                vec![
1555                    // [a ASC, b ASC]
1556                    vec![(col_a, option_asc), (col_b, option_asc)],
1557                    // [c ASC, b DESC]
1558                    vec![(col_c, option_asc), (col_b, option_desc)],
1559                    // [e ASC]
1560                    vec![(col_e, option_asc)],
1561                ],
1562                // proj exprs
1563                vec![
1564                    (col_c, "c_new".to_string()),
1565                    (col_a, "a_new".to_string()),
1566                    (col_b, "b_new".to_string()),
1567                    (&b_plus_e, "b+e".to_string()),
1568                ],
1569                // expected
1570                vec![
1571                    // [a_new ASC, b_new ASC]
1572                    vec![("a_new", option_asc), ("b_new", option_asc)],
1573                    // [a_new ASC, b_new ASC]
1574                    vec![("a_new", option_asc), ("b+e", option_asc)],
1575                    // [c_new ASC, b_new DESC]
1576                    vec![("c_new", option_asc), ("b_new", option_desc)],
1577                ],
1578            ),
1579        ];
1580
1581        for (idx, (orderings, proj_exprs, expected)) in test_cases.into_iter().enumerate()
1582        {
1583            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1584
1585            let orderings = convert_to_orderings(&orderings);
1586            eq_properties.add_orderings(orderings);
1587
1588            let proj_exprs = proj_exprs
1589                .into_iter()
1590                .map(|(expr, name)| (Arc::clone(expr), name));
1591            let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1592            let output_schema = output_schema(&projection_mapping, &schema)?;
1593
1594            let expected = expected
1595                .into_iter()
1596                .map(|ordering| {
1597                    ordering
1598                        .into_iter()
1599                        .map(|(name, options)| {
1600                            (col(name, &output_schema).unwrap(), options)
1601                        })
1602                        .collect::<Vec<_>>()
1603                })
1604                .collect::<Vec<_>>();
1605            let expected = convert_to_orderings(&expected);
1606
1607            let projected_eq = eq_properties.project(&projection_mapping, output_schema);
1608            let orderings = projected_eq.oeq_class();
1609
1610            let err_msg = format!(
1611                "test_idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1612            );
1613
1614            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1615            for expected_ordering in &expected {
1616                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1617            }
1618        }
1619
1620        Ok(())
1621    }
1622
1623    #[test]
1624    fn project_orderings2() -> Result<()> {
1625        let schema = Arc::new(Schema::new(vec![
1626            Field::new("a", DataType::Int32, true),
1627            Field::new("b", DataType::Int32, true),
1628            Field::new("c", DataType::Int32, true),
1629            Field::new("d", DataType::Int32, true),
1630            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1631        ]));
1632        let col_a = &col("a", &schema)?;
1633        let col_b = &col("b", &schema)?;
1634        let col_c = &col("c", &schema)?;
1635        let col_ts = &col("ts", &schema)?;
1636        let a_plus_b = Arc::new(BinaryExpr::new(
1637            Arc::clone(col_a),
1638            Operator::Plus,
1639            Arc::clone(col_b),
1640        )) as Arc<dyn PhysicalExpr>;
1641
1642        let test_fun = Arc::new(ScalarUDF::new_from_impl(TestScalarUDF::new()));
1643
1644        let round_c = Arc::new(ScalarFunctionExpr::try_new(
1645            test_fun,
1646            vec![Arc::clone(col_c)],
1647            &schema,
1648            Arc::new(ConfigOptions::default()),
1649        )?) as PhysicalExprRef;
1650
1651        let option_asc = SortOptions {
1652            descending: false,
1653            nulls_first: false,
1654        };
1655
1656        let proj_exprs = vec![
1657            (col_b, "b_new".to_string()),
1658            (col_a, "a_new".to_string()),
1659            (col_c, "c_new".to_string()),
1660            (&round_c, "round_c_res".to_string()),
1661        ];
1662        let proj_exprs = proj_exprs
1663            .into_iter()
1664            .map(|(expr, name)| (Arc::clone(expr), name));
1665        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1666        let output_schema = output_schema(&projection_mapping, &schema)?;
1667
1668        let col_a_new = &col("a_new", &output_schema)?;
1669        let col_b_new = &col("b_new", &output_schema)?;
1670        let col_c_new = &col("c_new", &output_schema)?;
1671        let col_round_c_res = &col("round_c_res", &output_schema)?;
1672        let a_new_plus_b_new = Arc::new(BinaryExpr::new(
1673            Arc::clone(col_a_new),
1674            Operator::Plus,
1675            Arc::clone(col_b_new),
1676        )) as Arc<dyn PhysicalExpr>;
1677
1678        let test_cases = [
1679            // ---------- TEST CASE 1 ------------
1680            (
1681                // orderings
1682                vec![
1683                    // [a ASC]
1684                    vec![(col_a, option_asc)],
1685                ],
1686                // expected
1687                vec![
1688                    // [b_new ASC]
1689                    vec![(col_a_new, option_asc)],
1690                ],
1691            ),
1692            // ---------- TEST CASE 2 ------------
1693            (
1694                // orderings
1695                vec![
1696                    // [a+b ASC]
1697                    vec![(&a_plus_b, option_asc)],
1698                ],
1699                // expected
1700                vec![
1701                    // [b_new ASC]
1702                    vec![(&a_new_plus_b_new, option_asc)],
1703                ],
1704            ),
1705            // ---------- TEST CASE 3 ------------
1706            (
1707                // orderings
1708                vec![
1709                    // [a ASC, ts ASC]
1710                    vec![(col_a, option_asc), (col_ts, option_asc)],
1711                ],
1712                // expected
1713                vec![
1714                    // [a_new ASC, date_bin_res ASC]
1715                    vec![(col_a_new, option_asc)],
1716                ],
1717            ),
1718            // ---------- TEST CASE 4 ------------
1719            (
1720                // orderings
1721                vec![
1722                    // [a ASC, ts ASC, b ASC]
1723                    vec![
1724                        (col_a, option_asc),
1725                        (col_ts, option_asc),
1726                        (col_b, option_asc),
1727                    ],
1728                ],
1729                // expected
1730                vec![
1731                    // [a_new ASC, date_bin_res ASC]
1732                    vec![(col_a_new, option_asc)],
1733                ],
1734            ),
1735            // ---------- TEST CASE 5 ------------
1736            (
1737                // orderings
1738                vec![
1739                    // [a ASC, c ASC]
1740                    vec![(col_a, option_asc), (col_c, option_asc)],
1741                ],
1742                // expected
1743                vec![
1744                    // [a_new ASC, round_c_res ASC, c_new ASC]
1745                    vec![(col_a_new, option_asc), (col_round_c_res, option_asc)],
1746                    // [a_new ASC, c_new ASC]
1747                    vec![(col_a_new, option_asc), (col_c_new, option_asc)],
1748                ],
1749            ),
1750            // ---------- TEST CASE 6 ------------
1751            (
1752                // orderings
1753                vec![
1754                    // [c ASC, b ASC]
1755                    vec![(col_c, option_asc), (col_b, option_asc)],
1756                ],
1757                // expected
1758                vec![
1759                    // [round_c_res ASC]
1760                    vec![(col_round_c_res, option_asc)],
1761                    // [c_new ASC, b_new ASC]
1762                    vec![(col_c_new, option_asc), (col_b_new, option_asc)],
1763                ],
1764            ),
1765            // ---------- TEST CASE 7 ------------
1766            (
1767                // orderings
1768                vec![
1769                    // [a+b ASC, c ASC]
1770                    vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1771                ],
1772                // expected
1773                vec![
1774                    // [a+b ASC, round(c) ASC, c_new ASC]
1775                    vec![
1776                        (&a_new_plus_b_new, option_asc),
1777                        (col_round_c_res, option_asc),
1778                    ],
1779                    // [a+b ASC, c_new ASC]
1780                    vec![(&a_new_plus_b_new, option_asc), (col_c_new, option_asc)],
1781                ],
1782            ),
1783        ];
1784
1785        for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
1786            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1787
1788            let orderings = convert_to_orderings(orderings);
1789            eq_properties.add_orderings(orderings);
1790
1791            let expected = convert_to_orderings(expected);
1792
1793            let projected_eq =
1794                eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1795            let orderings = projected_eq.oeq_class();
1796
1797            let err_msg = format!(
1798                "test idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1799            );
1800
1801            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1802            for expected_ordering in &expected {
1803                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1804            }
1805        }
1806        Ok(())
1807    }
1808
1809    #[test]
1810    fn project_orderings3() -> Result<()> {
1811        let schema = Arc::new(Schema::new(vec![
1812            Field::new("a", DataType::Int32, true),
1813            Field::new("b", DataType::Int32, true),
1814            Field::new("c", DataType::Int32, true),
1815            Field::new("d", DataType::Int32, true),
1816            Field::new("e", DataType::Int32, true),
1817            Field::new("f", DataType::Int32, true),
1818        ]));
1819        let col_a = &col("a", &schema)?;
1820        let col_b = &col("b", &schema)?;
1821        let col_c = &col("c", &schema)?;
1822        let col_d = &col("d", &schema)?;
1823        let col_e = &col("e", &schema)?;
1824        let col_f = &col("f", &schema)?;
1825        let a_plus_b = Arc::new(BinaryExpr::new(
1826            Arc::clone(col_a),
1827            Operator::Plus,
1828            Arc::clone(col_b),
1829        )) as Arc<dyn PhysicalExpr>;
1830
1831        let option_asc = SortOptions {
1832            descending: false,
1833            nulls_first: false,
1834        };
1835
1836        let proj_exprs = vec![
1837            (col_c, "c_new".to_string()),
1838            (col_d, "d_new".to_string()),
1839            (&a_plus_b, "a+b".to_string()),
1840        ];
1841        let proj_exprs = proj_exprs
1842            .into_iter()
1843            .map(|(expr, name)| (Arc::clone(expr), name));
1844        let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1845        let output_schema = output_schema(&projection_mapping, &schema)?;
1846
1847        let col_a_plus_b_new = &col("a+b", &output_schema)?;
1848        let col_c_new = &col("c_new", &output_schema)?;
1849        let col_d_new = &col("d_new", &output_schema)?;
1850
1851        let test_cases = vec![
1852            // ---------- TEST CASE 1 ------------
1853            (
1854                // orderings
1855                vec![
1856                    // [d ASC, b ASC]
1857                    vec![(col_d, option_asc), (col_b, option_asc)],
1858                    // [c ASC, a ASC]
1859                    vec![(col_c, option_asc), (col_a, option_asc)],
1860                ],
1861                // equal conditions
1862                vec![],
1863                // expected
1864                vec![
1865                    // [d_new ASC, c_new ASC, a+b ASC]
1866                    vec![
1867                        (col_d_new, option_asc),
1868                        (col_c_new, option_asc),
1869                        (col_a_plus_b_new, option_asc),
1870                    ],
1871                    // [c_new ASC, d_new ASC, a+b ASC]
1872                    vec![
1873                        (col_c_new, option_asc),
1874                        (col_d_new, option_asc),
1875                        (col_a_plus_b_new, option_asc),
1876                    ],
1877                ],
1878            ),
1879            // ---------- TEST CASE 2 ------------
1880            (
1881                // orderings
1882                vec![
1883                    // [d ASC, b ASC]
1884                    vec![(col_d, option_asc), (col_b, option_asc)],
1885                    // [c ASC, e ASC], Please note that a=e
1886                    vec![(col_c, option_asc), (col_e, option_asc)],
1887                ],
1888                // equal conditions
1889                vec![(col_e, col_a)],
1890                // expected
1891                vec![
1892                    // [d_new ASC, c_new ASC, a+b ASC]
1893                    vec![
1894                        (col_d_new, option_asc),
1895                        (col_c_new, option_asc),
1896                        (col_a_plus_b_new, option_asc),
1897                    ],
1898                    // [c_new ASC, d_new ASC, a+b ASC]
1899                    vec![
1900                        (col_c_new, option_asc),
1901                        (col_d_new, option_asc),
1902                        (col_a_plus_b_new, option_asc),
1903                    ],
1904                ],
1905            ),
1906            // ---------- TEST CASE 3 ------------
1907            (
1908                // orderings
1909                vec![
1910                    // [d ASC, b ASC]
1911                    vec![(col_d, option_asc), (col_b, option_asc)],
1912                    // [c ASC, e ASC], Please note that a=f
1913                    vec![(col_c, option_asc), (col_e, option_asc)],
1914                ],
1915                // equal conditions
1916                vec![(col_a, col_f)],
1917                // expected
1918                vec![
1919                    // [d_new ASC]
1920                    vec![(col_d_new, option_asc)],
1921                    // [c_new ASC]
1922                    vec![(col_c_new, option_asc)],
1923                ],
1924            ),
1925        ];
1926        for (orderings, equal_columns, expected) in test_cases {
1927            let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1928            for (lhs, rhs) in equal_columns {
1929                eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?;
1930            }
1931
1932            let orderings = convert_to_orderings(&orderings);
1933            eq_properties.add_orderings(orderings);
1934
1935            let expected = convert_to_orderings(&expected);
1936
1937            let projected_eq =
1938                eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1939            let orderings = projected_eq.oeq_class();
1940
1941            let err_msg = format!(
1942                "actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1943            );
1944
1945            assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1946            for expected_ordering in &expected {
1947                assert!(orderings.contains(expected_ordering), "{}", err_msg)
1948            }
1949        }
1950
1951        Ok(())
1952    }
1953
1954    fn get_stats() -> Statistics {
1955        Statistics {
1956            num_rows: Precision::Exact(5),
1957            total_byte_size: Precision::Exact(23),
1958            column_statistics: vec![
1959                ColumnStatistics {
1960                    distinct_count: Precision::Exact(5),
1961                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
1962                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
1963                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
1964                    null_count: Precision::Exact(0),
1965                    byte_size: Precision::Absent,
1966                },
1967                ColumnStatistics {
1968                    distinct_count: Precision::Exact(1),
1969                    max_value: Precision::Exact(ScalarValue::from("x")),
1970                    min_value: Precision::Exact(ScalarValue::from("a")),
1971                    sum_value: Precision::Absent,
1972                    null_count: Precision::Exact(3),
1973                    byte_size: Precision::Absent,
1974                },
1975                ColumnStatistics {
1976                    distinct_count: Precision::Absent,
1977                    max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
1978                    min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
1979                    sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
1980                    null_count: Precision::Absent,
1981                    byte_size: Precision::Absent,
1982                },
1983            ],
1984        }
1985    }
1986
1987    fn get_schema() -> Schema {
1988        let field_0 = Field::new("col0", DataType::Int64, false);
1989        let field_1 = Field::new("col1", DataType::Utf8, false);
1990        let field_2 = Field::new("col2", DataType::Float32, false);
1991        Schema::new(vec![field_0, field_1, field_2])
1992    }
1993
1994    #[test]
1995    fn test_stats_projection_columns_only() {
1996        let source = get_stats();
1997        let schema = get_schema();
1998
1999        let projection = ProjectionExprs::new(vec![
2000            ProjectionExpr {
2001                expr: Arc::new(Column::new("col1", 1)),
2002                alias: "col1".to_string(),
2003            },
2004            ProjectionExpr {
2005                expr: Arc::new(Column::new("col0", 0)),
2006                alias: "col0".to_string(),
2007            },
2008        ]);
2009
2010        let result = projection
2011            .project_statistics(source, &projection.project_schema(&schema).unwrap())
2012            .unwrap();
2013
2014        let expected = Statistics {
2015            num_rows: Precision::Exact(5),
2016            // Because there is a variable length Utf8 column we cannot calculate exact byte size after projection
2017            // Thus we set it to Inexact (originally it was Exact(23))
2018            total_byte_size: Precision::Inexact(23),
2019            column_statistics: vec![
2020                ColumnStatistics {
2021                    distinct_count: Precision::Exact(1),
2022                    max_value: Precision::Exact(ScalarValue::from("x")),
2023                    min_value: Precision::Exact(ScalarValue::from("a")),
2024                    sum_value: Precision::Absent,
2025                    null_count: Precision::Exact(3),
2026                    byte_size: Precision::Absent,
2027                },
2028                ColumnStatistics {
2029                    distinct_count: Precision::Exact(5),
2030                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
2031                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
2032                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
2033                    null_count: Precision::Exact(0),
2034                    byte_size: Precision::Absent,
2035                },
2036            ],
2037        };
2038
2039        assert_eq!(result, expected);
2040    }
2041
2042    #[test]
2043    fn test_stats_projection_column_with_primitive_width_only() {
2044        let source = get_stats();
2045        let schema = get_schema();
2046
2047        let projection = ProjectionExprs::new(vec![
2048            ProjectionExpr {
2049                expr: Arc::new(Column::new("col2", 2)),
2050                alias: "col2".to_string(),
2051            },
2052            ProjectionExpr {
2053                expr: Arc::new(Column::new("col0", 0)),
2054                alias: "col0".to_string(),
2055            },
2056        ]);
2057
2058        let result = projection
2059            .project_statistics(source, &projection.project_schema(&schema).unwrap())
2060            .unwrap();
2061
2062        let expected = Statistics {
2063            num_rows: Precision::Exact(5),
2064            total_byte_size: Precision::Exact(60),
2065            column_statistics: vec![
2066                ColumnStatistics {
2067                    distinct_count: Precision::Absent,
2068                    max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
2069                    min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
2070                    sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
2071                    null_count: Precision::Absent,
2072                    byte_size: Precision::Absent,
2073                },
2074                ColumnStatistics {
2075                    distinct_count: Precision::Exact(5),
2076                    max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
2077                    min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
2078                    sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
2079                    null_count: Precision::Exact(0),
2080                    byte_size: Precision::Absent,
2081                },
2082            ],
2083        };
2084
2085        assert_eq!(result, expected);
2086    }
2087
2088    // Tests for Projection struct
2089
2090    #[test]
2091    fn test_projection_new() -> Result<()> {
2092        let exprs = vec![
2093            ProjectionExpr {
2094                expr: Arc::new(Column::new("a", 0)),
2095                alias: "a".to_string(),
2096            },
2097            ProjectionExpr {
2098                expr: Arc::new(Column::new("b", 1)),
2099                alias: "b".to_string(),
2100            },
2101        ];
2102        let projection = ProjectionExprs::new(exprs.clone());
2103        assert_eq!(projection.as_ref().len(), 2);
2104        Ok(())
2105    }
2106
2107    #[test]
2108    fn test_projection_from_vec() -> Result<()> {
2109        let exprs = vec![ProjectionExpr {
2110            expr: Arc::new(Column::new("x", 0)),
2111            alias: "x".to_string(),
2112        }];
2113        let projection: ProjectionExprs = exprs.clone().into();
2114        assert_eq!(projection.as_ref().len(), 1);
2115        Ok(())
2116    }
2117
2118    #[test]
2119    fn test_projection_as_ref() -> Result<()> {
2120        let exprs = vec![
2121            ProjectionExpr {
2122                expr: Arc::new(Column::new("col1", 0)),
2123                alias: "col1".to_string(),
2124            },
2125            ProjectionExpr {
2126                expr: Arc::new(Column::new("col2", 1)),
2127                alias: "col2".to_string(),
2128            },
2129        ];
2130        let projection = ProjectionExprs::new(exprs);
2131        let as_ref: &[ProjectionExpr] = projection.as_ref();
2132        assert_eq!(as_ref.len(), 2);
2133        Ok(())
2134    }
2135
2136    #[test]
2137    fn test_column_indices_multiple_columns() -> Result<()> {
2138        // Test with reversed column order to ensure proper reordering
2139        let projection = ProjectionExprs::new(vec![
2140            ProjectionExpr {
2141                expr: Arc::new(Column::new("c", 5)),
2142                alias: "c".to_string(),
2143            },
2144            ProjectionExpr {
2145                expr: Arc::new(Column::new("b", 2)),
2146                alias: "b".to_string(),
2147            },
2148            ProjectionExpr {
2149                expr: Arc::new(Column::new("a", 0)),
2150                alias: "a".to_string(),
2151            },
2152        ]);
2153        // Should return sorted indices regardless of projection order
2154        assert_eq!(projection.column_indices(), vec![0, 2, 5]);
2155        Ok(())
2156    }
2157
2158    #[test]
2159    fn test_column_indices_duplicates() -> Result<()> {
2160        // Test that duplicate column indices appear only once
2161        let projection = ProjectionExprs::new(vec![
2162            ProjectionExpr {
2163                expr: Arc::new(Column::new("a", 1)),
2164                alias: "a".to_string(),
2165            },
2166            ProjectionExpr {
2167                expr: Arc::new(Column::new("b", 3)),
2168                alias: "b".to_string(),
2169            },
2170            ProjectionExpr {
2171                expr: Arc::new(Column::new("a2", 1)), // duplicate index
2172                alias: "a2".to_string(),
2173            },
2174        ]);
2175        assert_eq!(projection.column_indices(), vec![1, 3]);
2176        Ok(())
2177    }
2178
2179    #[test]
2180    fn test_column_indices_unsorted() -> Result<()> {
2181        // Test that column indices are sorted in the output
2182        let projection = ProjectionExprs::new(vec![
2183            ProjectionExpr {
2184                expr: Arc::new(Column::new("c", 5)),
2185                alias: "c".to_string(),
2186            },
2187            ProjectionExpr {
2188                expr: Arc::new(Column::new("a", 1)),
2189                alias: "a".to_string(),
2190            },
2191            ProjectionExpr {
2192                expr: Arc::new(Column::new("b", 3)),
2193                alias: "b".to_string(),
2194            },
2195        ]);
2196        assert_eq!(projection.column_indices(), vec![1, 3, 5]);
2197        Ok(())
2198    }
2199
2200    #[test]
2201    fn test_column_indices_complex_expr() -> Result<()> {
2202        // Test with complex expressions containing multiple columns
2203        let expr = Arc::new(BinaryExpr::new(
2204            Arc::new(Column::new("a", 1)),
2205            Operator::Plus,
2206            Arc::new(Column::new("b", 4)),
2207        ));
2208        let projection = ProjectionExprs::new(vec![
2209            ProjectionExpr {
2210                expr,
2211                alias: "sum".to_string(),
2212            },
2213            ProjectionExpr {
2214                expr: Arc::new(Column::new("c", 2)),
2215                alias: "c".to_string(),
2216            },
2217        ]);
2218        // Should return [1, 2, 4] - all columns used, sorted and deduplicated
2219        assert_eq!(projection.column_indices(), vec![1, 2, 4]);
2220        Ok(())
2221    }
2222
2223    #[test]
2224    fn test_column_indices_empty() -> Result<()> {
2225        let projection = ProjectionExprs::new(vec![]);
2226        assert_eq!(projection.column_indices(), Vec::<usize>::new());
2227        Ok(())
2228    }
2229
2230    #[test]
2231    fn test_merge_simple_columns() -> Result<()> {
2232        // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
2233        let base_projection = ProjectionExprs::new(vec![
2234            ProjectionExpr {
2235                expr: Arc::new(Column::new("c", 2)),
2236                alias: "x".to_string(),
2237            },
2238            ProjectionExpr {
2239                expr: Arc::new(Column::new("b", 1)),
2240                alias: "y".to_string(),
2241            },
2242            ProjectionExpr {
2243                expr: Arc::new(Column::new("a", 0)),
2244                alias: "z".to_string(),
2245            },
2246        ]);
2247
2248        // Second projection: SELECT y@1 AS col2, x@0 AS col1
2249        let top_projection = ProjectionExprs::new(vec![
2250            ProjectionExpr {
2251                expr: Arc::new(Column::new("y", 1)),
2252                alias: "col2".to_string(),
2253            },
2254            ProjectionExpr {
2255                expr: Arc::new(Column::new("x", 0)),
2256                alias: "col1".to_string(),
2257            },
2258        ]);
2259
2260        // Merge should produce: SELECT b@1 AS col2, c@2 AS col1
2261        let merged = base_projection.try_merge(&top_projection)?;
2262        assert_snapshot!(format!("{merged}"), @"Projection[b@1 AS col2, c@2 AS col1]");
2263
2264        Ok(())
2265    }
2266
2267    #[test]
2268    fn test_merge_with_expressions() -> Result<()> {
2269        // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
2270        let base_projection = ProjectionExprs::new(vec![
2271            ProjectionExpr {
2272                expr: Arc::new(Column::new("c", 2)),
2273                alias: "x".to_string(),
2274            },
2275            ProjectionExpr {
2276                expr: Arc::new(Column::new("b", 1)),
2277                alias: "y".to_string(),
2278            },
2279            ProjectionExpr {
2280                expr: Arc::new(Column::new("a", 0)),
2281                alias: "z".to_string(),
2282            },
2283        ]);
2284
2285        // Second projection: SELECT y@1 + z@2 AS c2, x@0 + 1 AS c1
2286        let top_projection = ProjectionExprs::new(vec![
2287            ProjectionExpr {
2288                expr: Arc::new(BinaryExpr::new(
2289                    Arc::new(Column::new("y", 1)),
2290                    Operator::Plus,
2291                    Arc::new(Column::new("z", 2)),
2292                )),
2293                alias: "c2".to_string(),
2294            },
2295            ProjectionExpr {
2296                expr: Arc::new(BinaryExpr::new(
2297                    Arc::new(Column::new("x", 0)),
2298                    Operator::Plus,
2299                    Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
2300                )),
2301                alias: "c1".to_string(),
2302            },
2303        ]);
2304
2305        // Merge should produce: SELECT b@1 + a@0 AS c2, c@2 + 1 AS c1
2306        let merged = base_projection.try_merge(&top_projection)?;
2307        assert_snapshot!(format!("{merged}"), @"Projection[b@1 + a@0 AS c2, c@2 + 1 AS c1]");
2308
2309        Ok(())
2310    }
2311
2312    #[test]
2313    fn try_merge_error() {
2314        // Create a base projection
2315        let base = ProjectionExprs::new(vec![
2316            ProjectionExpr {
2317                expr: Arc::new(Column::new("a", 0)),
2318                alias: "x".to_string(),
2319            },
2320            ProjectionExpr {
2321                expr: Arc::new(Column::new("b", 1)),
2322                alias: "y".to_string(),
2323            },
2324        ]);
2325
2326        // Create a top projection that references a non-existent column index
2327        let top = ProjectionExprs::new(vec![ProjectionExpr {
2328            expr: Arc::new(Column::new("z", 5)), // Invalid index
2329            alias: "result".to_string(),
2330        }]);
2331
2332        // Attempt to merge and expect an error
2333        let err_msg = base.try_merge(&top).unwrap_err().to_string();
2334        assert!(
2335            err_msg.contains("Internal error: Column index 5 out of bounds for projected expressions of length 2"),
2336            "Unexpected error message: {err_msg}",
2337        );
2338    }
2339
2340    #[test]
2341    fn test_merge_empty_projection_with_literal() -> Result<()> {
2342        // This test reproduces the issue from roundtrip_empty_projection test
2343        // Query like: SELECT 1 FROM table
2344        // where the file scan needs no columns (empty projection)
2345        // but we project a literal on top
2346
2347        // Empty base projection (no columns needed from file)
2348        let base_projection = ProjectionExprs::new(vec![]);
2349
2350        // Top projection with a literal expression: SELECT 1
2351        let top_projection = ProjectionExprs::new(vec![ProjectionExpr {
2352            expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2353            alias: "Int64(1)".to_string(),
2354        }]);
2355
2356        // This should succeed - literals don't reference columns so they should
2357        // pass through unchanged when merged with an empty projection
2358        let merged = base_projection.try_merge(&top_projection)?;
2359        assert_snapshot!(format!("{merged}"), @"Projection[1 AS Int64(1)]");
2360
2361        Ok(())
2362    }
2363
2364    #[test]
2365    fn test_update_expr_with_literal() -> Result<()> {
2366        // Test that update_expr correctly handles expressions without column references
2367        let literal_expr: Arc<dyn PhysicalExpr> =
2368            Arc::new(Literal::new(ScalarValue::Int64(Some(42))));
2369        let empty_projection: Vec<ProjectionExpr> = vec![];
2370
2371        // Updating a literal with an empty projection should return the literal unchanged
2372        let result = update_expr(&literal_expr, &empty_projection, true)?;
2373        assert!(result.is_some(), "Literal expression should be valid");
2374
2375        let result_expr = result.unwrap();
2376        assert_eq!(
2377            result_expr
2378                .as_any()
2379                .downcast_ref::<Literal>()
2380                .unwrap()
2381                .value(),
2382            &ScalarValue::Int64(Some(42))
2383        );
2384
2385        Ok(())
2386    }
2387
2388    #[test]
2389    fn test_update_expr_with_complex_literal_expr() -> Result<()> {
2390        // Test update_expr with an expression containing both literals and a column
2391        // This tests the case where we have: literal + column
2392        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
2393            Arc::new(Literal::new(ScalarValue::Int64(Some(10)))),
2394            Operator::Plus,
2395            Arc::new(Column::new("x", 0)),
2396        ));
2397
2398        // Base projection that maps column 0 to a different expression
2399        let base_projection = vec![ProjectionExpr {
2400            expr: Arc::new(Column::new("a", 5)),
2401            alias: "x".to_string(),
2402        }];
2403
2404        // The expression should be updated: 10 + x@0 becomes 10 + a@5
2405        let result = update_expr(&expr, &base_projection, true)?;
2406        assert!(result.is_some(), "Expression should be valid");
2407
2408        let result_expr = result.unwrap();
2409        let binary = result_expr
2410            .as_any()
2411            .downcast_ref::<BinaryExpr>()
2412            .expect("Should be a BinaryExpr");
2413
2414        // Left side should still be the literal
2415        assert!(binary.left().as_any().downcast_ref::<Literal>().is_some());
2416
2417        // Right side should be updated to reference column at index 5
2418        let right_col = binary
2419            .right()
2420            .as_any()
2421            .downcast_ref::<Column>()
2422            .expect("Right should be a Column");
2423        assert_eq!(right_col.index(), 5);
2424
2425        Ok(())
2426    }
2427
2428    #[test]
2429    fn test_project_schema_simple_columns() -> Result<()> {
2430        // Input schema: [col0: Int64, col1: Utf8, col2: Float32]
2431        let input_schema = get_schema();
2432
2433        // Projection: SELECT col2 AS c, col0 AS a
2434        let projection = ProjectionExprs::new(vec![
2435            ProjectionExpr {
2436                expr: Arc::new(Column::new("col2", 2)),
2437                alias: "c".to_string(),
2438            },
2439            ProjectionExpr {
2440                expr: Arc::new(Column::new("col0", 0)),
2441                alias: "a".to_string(),
2442            },
2443        ]);
2444
2445        let output_schema = projection.project_schema(&input_schema)?;
2446
2447        // Should have 2 fields
2448        assert_eq!(output_schema.fields().len(), 2);
2449
2450        // First field should be "c" with Float32 type
2451        assert_eq!(output_schema.field(0).name(), "c");
2452        assert_eq!(output_schema.field(0).data_type(), &DataType::Float32);
2453
2454        // Second field should be "a" with Int64 type
2455        assert_eq!(output_schema.field(1).name(), "a");
2456        assert_eq!(output_schema.field(1).data_type(), &DataType::Int64);
2457
2458        Ok(())
2459    }
2460
2461    #[test]
2462    fn test_project_schema_with_expressions() -> Result<()> {
2463        // Input schema: [col0: Int64, col1: Utf8, col2: Float32]
2464        let input_schema = get_schema();
2465
2466        // Projection: SELECT col0 + 1 AS incremented
2467        let projection = ProjectionExprs::new(vec![ProjectionExpr {
2468            expr: Arc::new(BinaryExpr::new(
2469                Arc::new(Column::new("col0", 0)),
2470                Operator::Plus,
2471                Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2472            )),
2473            alias: "incremented".to_string(),
2474        }]);
2475
2476        let output_schema = projection.project_schema(&input_schema)?;
2477
2478        // Should have 1 field
2479        assert_eq!(output_schema.fields().len(), 1);
2480
2481        // Field should be "incremented" with Int64 type
2482        assert_eq!(output_schema.field(0).name(), "incremented");
2483        assert_eq!(output_schema.field(0).data_type(), &DataType::Int64);
2484
2485        Ok(())
2486    }
2487
2488    #[test]
2489    fn test_project_schema_preserves_metadata() -> Result<()> {
2490        // Create schema with metadata
2491        let mut metadata = HashMap::new();
2492        metadata.insert("key".to_string(), "value".to_string());
2493        let field_with_metadata =
2494            Field::new("col0", DataType::Int64, false).with_metadata(metadata.clone());
2495        let input_schema = Schema::new(vec![
2496            field_with_metadata,
2497            Field::new("col1", DataType::Utf8, false),
2498        ]);
2499
2500        // Projection: SELECT col0 AS renamed
2501        let projection = ProjectionExprs::new(vec![ProjectionExpr {
2502            expr: Arc::new(Column::new("col0", 0)),
2503            alias: "renamed".to_string(),
2504        }]);
2505
2506        let output_schema = projection.project_schema(&input_schema)?;
2507
2508        // Should have 1 field
2509        assert_eq!(output_schema.fields().len(), 1);
2510
2511        // Field should be "renamed" with metadata preserved
2512        assert_eq!(output_schema.field(0).name(), "renamed");
2513        assert_eq!(output_schema.field(0).metadata(), &metadata);
2514
2515        Ok(())
2516    }
2517
2518    #[test]
2519    fn test_project_schema_empty() -> Result<()> {
2520        let input_schema = get_schema();
2521        let projection = ProjectionExprs::new(vec![]);
2522
2523        let output_schema = projection.project_schema(&input_schema)?;
2524
2525        assert_eq!(output_schema.fields().len(), 0);
2526
2527        Ok(())
2528    }
2529
2530    #[test]
2531    fn test_project_statistics_columns_only() -> Result<()> {
2532        let input_stats = get_stats();
2533        let input_schema = get_schema();
2534
2535        // Projection: SELECT col1 AS text, col0 AS num
2536        let projection = ProjectionExprs::new(vec![
2537            ProjectionExpr {
2538                expr: Arc::new(Column::new("col1", 1)),
2539                alias: "text".to_string(),
2540            },
2541            ProjectionExpr {
2542                expr: Arc::new(Column::new("col0", 0)),
2543                alias: "num".to_string(),
2544            },
2545        ]);
2546
2547        let output_stats = projection.project_statistics(
2548            input_stats,
2549            &projection.project_schema(&input_schema)?,
2550        )?;
2551
2552        // Row count should be preserved
2553        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2554
2555        // Should have 2 column statistics (reordered from input)
2556        assert_eq!(output_stats.column_statistics.len(), 2);
2557
2558        // First column (col1 from input)
2559        assert_eq!(
2560            output_stats.column_statistics[0].distinct_count,
2561            Precision::Exact(1)
2562        );
2563        assert_eq!(
2564            output_stats.column_statistics[0].max_value,
2565            Precision::Exact(ScalarValue::from("x"))
2566        );
2567
2568        // Second column (col0 from input)
2569        assert_eq!(
2570            output_stats.column_statistics[1].distinct_count,
2571            Precision::Exact(5)
2572        );
2573        assert_eq!(
2574            output_stats.column_statistics[1].max_value,
2575            Precision::Exact(ScalarValue::Int64(Some(21)))
2576        );
2577
2578        Ok(())
2579    }
2580
2581    #[test]
2582    fn test_project_statistics_with_expressions() -> Result<()> {
2583        let input_stats = get_stats();
2584        let input_schema = get_schema();
2585
2586        // Projection with expression: SELECT col0 + 1 AS incremented, col1 AS text
2587        let projection = ProjectionExprs::new(vec![
2588            ProjectionExpr {
2589                expr: Arc::new(BinaryExpr::new(
2590                    Arc::new(Column::new("col0", 0)),
2591                    Operator::Plus,
2592                    Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2593                )),
2594                alias: "incremented".to_string(),
2595            },
2596            ProjectionExpr {
2597                expr: Arc::new(Column::new("col1", 1)),
2598                alias: "text".to_string(),
2599            },
2600        ]);
2601
2602        let output_stats = projection.project_statistics(
2603            input_stats,
2604            &projection.project_schema(&input_schema)?,
2605        )?;
2606
2607        // Row count should be preserved
2608        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2609
2610        // Should have 2 column statistics
2611        assert_eq!(output_stats.column_statistics.len(), 2);
2612
2613        // First column (expression) should have unknown statistics
2614        assert_eq!(
2615            output_stats.column_statistics[0].distinct_count,
2616            Precision::Absent
2617        );
2618        assert_eq!(
2619            output_stats.column_statistics[0].max_value,
2620            Precision::Absent
2621        );
2622
2623        // Second column (col1) should preserve statistics
2624        assert_eq!(
2625            output_stats.column_statistics[1].distinct_count,
2626            Precision::Exact(1)
2627        );
2628
2629        Ok(())
2630    }
2631
2632    #[test]
2633    fn test_project_statistics_primitive_width_only() -> Result<()> {
2634        let input_stats = get_stats();
2635        let input_schema = get_schema();
2636
2637        // Projection with only primitive width columns: SELECT col2 AS f, col0 AS i
2638        let projection = ProjectionExprs::new(vec![
2639            ProjectionExpr {
2640                expr: Arc::new(Column::new("col2", 2)),
2641                alias: "f".to_string(),
2642            },
2643            ProjectionExpr {
2644                expr: Arc::new(Column::new("col0", 0)),
2645                alias: "i".to_string(),
2646            },
2647        ]);
2648
2649        let output_stats = projection.project_statistics(
2650            input_stats,
2651            &projection.project_schema(&input_schema)?,
2652        )?;
2653
2654        // Row count should be preserved
2655        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2656
2657        // Total byte size should be recalculated for primitive types
2658        // Float32 (4 bytes) + Int64 (8 bytes) = 12 bytes per row, 5 rows = 60 bytes
2659        assert_eq!(output_stats.total_byte_size, Precision::Exact(60));
2660
2661        // Should have 2 column statistics
2662        assert_eq!(output_stats.column_statistics.len(), 2);
2663
2664        Ok(())
2665    }
2666
2667    #[test]
2668    fn test_project_statistics_empty() -> Result<()> {
2669        let input_stats = get_stats();
2670        let input_schema = get_schema();
2671
2672        let projection = ProjectionExprs::new(vec![]);
2673
2674        let output_stats = projection.project_statistics(
2675            input_stats,
2676            &projection.project_schema(&input_schema)?,
2677        )?;
2678
2679        // Row count should be preserved
2680        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2681
2682        // Should have no column statistics
2683        assert_eq!(output_stats.column_statistics.len(), 0);
2684
2685        // Total byte size should be 0 for empty projection
2686        assert_eq!(output_stats.total_byte_size, Precision::Exact(0));
2687
2688        Ok(())
2689    }
2690
2691    // Test statistics calculation for non-null literal (numeric constant)
2692    #[test]
2693    fn test_project_statistics_with_literal() -> Result<()> {
2694        let input_stats = get_stats();
2695        let input_schema = get_schema();
2696
2697        // Projection with literal: SELECT 42 AS constant, col0 AS num
2698        let projection = ProjectionExprs::new(vec![
2699            ProjectionExpr {
2700                expr: Arc::new(Literal::new(ScalarValue::Int64(Some(42)))),
2701                alias: "constant".to_string(),
2702            },
2703            ProjectionExpr {
2704                expr: Arc::new(Column::new("col0", 0)),
2705                alias: "num".to_string(),
2706            },
2707        ]);
2708
2709        let output_stats = projection.project_statistics(
2710            input_stats,
2711            &projection.project_schema(&input_schema)?,
2712        )?;
2713
2714        // Row count should be preserved
2715        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2716
2717        // Should have 2 column statistics
2718        assert_eq!(output_stats.column_statistics.len(), 2);
2719
2720        // First column (literal 42) should have proper constant statistics
2721        assert_eq!(
2722            output_stats.column_statistics[0].min_value,
2723            Precision::Exact(ScalarValue::Int64(Some(42)))
2724        );
2725        assert_eq!(
2726            output_stats.column_statistics[0].max_value,
2727            Precision::Exact(ScalarValue::Int64(Some(42)))
2728        );
2729        assert_eq!(
2730            output_stats.column_statistics[0].distinct_count,
2731            Precision::Exact(1)
2732        );
2733        assert_eq!(
2734            output_stats.column_statistics[0].null_count,
2735            Precision::Exact(0)
2736        );
2737        // Int64 is 8 bytes, 5 rows = 40 bytes
2738        assert_eq!(
2739            output_stats.column_statistics[0].byte_size,
2740            Precision::Exact(40)
2741        );
2742        // For a constant column, sum_value = value * num_rows = 42 * 5 = 210
2743        assert_eq!(
2744            output_stats.column_statistics[0].sum_value,
2745            Precision::Exact(ScalarValue::Int64(Some(210)))
2746        );
2747
2748        // Second column (col0) should preserve statistics
2749        assert_eq!(
2750            output_stats.column_statistics[1].distinct_count,
2751            Precision::Exact(5)
2752        );
2753        assert_eq!(
2754            output_stats.column_statistics[1].max_value,
2755            Precision::Exact(ScalarValue::Int64(Some(21)))
2756        );
2757
2758        Ok(())
2759    }
2760
2761    // Test statistics calculation for NULL literal (constant NULL column)
2762    #[test]
2763    fn test_project_statistics_with_null_literal() -> Result<()> {
2764        let input_stats = get_stats();
2765        let input_schema = get_schema();
2766
2767        // Projection with NULL literal: SELECT NULL AS null_col, col0 AS num
2768        let projection = ProjectionExprs::new(vec![
2769            ProjectionExpr {
2770                expr: Arc::new(Literal::new(ScalarValue::Int64(None))),
2771                alias: "null_col".to_string(),
2772            },
2773            ProjectionExpr {
2774                expr: Arc::new(Column::new("col0", 0)),
2775                alias: "num".to_string(),
2776            },
2777        ]);
2778
2779        let output_stats = projection.project_statistics(
2780            input_stats,
2781            &projection.project_schema(&input_schema)?,
2782        )?;
2783
2784        // Row count should be preserved
2785        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2786
2787        // Should have 2 column statistics
2788        assert_eq!(output_stats.column_statistics.len(), 2);
2789
2790        // First column (NULL literal) should have proper constant NULL statistics
2791        assert_eq!(
2792            output_stats.column_statistics[0].min_value,
2793            Precision::Exact(ScalarValue::Int64(None))
2794        );
2795        assert_eq!(
2796            output_stats.column_statistics[0].max_value,
2797            Precision::Exact(ScalarValue::Int64(None))
2798        );
2799        assert_eq!(
2800            output_stats.column_statistics[0].distinct_count,
2801            Precision::Exact(1) // All NULLs are considered the same
2802        );
2803        assert_eq!(
2804            output_stats.column_statistics[0].null_count,
2805            Precision::Exact(5) // All rows are NULL
2806        );
2807        assert_eq!(
2808            output_stats.column_statistics[0].byte_size,
2809            Precision::Exact(0)
2810        );
2811        assert_eq!(
2812            output_stats.column_statistics[0].sum_value,
2813            Precision::Exact(ScalarValue::Int64(None))
2814        );
2815
2816        // Second column (col0) should preserve statistics
2817        assert_eq!(
2818            output_stats.column_statistics[1].distinct_count,
2819            Precision::Exact(5)
2820        );
2821        assert_eq!(
2822            output_stats.column_statistics[1].max_value,
2823            Precision::Exact(ScalarValue::Int64(Some(21)))
2824        );
2825
2826        Ok(())
2827    }
2828
2829    // Test statistics calculation for complex type literal (e.g., Utf8 string)
2830    #[test]
2831    fn test_project_statistics_with_complex_type_literal() -> Result<()> {
2832        let input_stats = get_stats();
2833        let input_schema = get_schema();
2834
2835        // Projection with Utf8 literal (complex type): SELECT 'hello' AS text, col0 AS num
2836        let projection = ProjectionExprs::new(vec![
2837            ProjectionExpr {
2838                expr: Arc::new(Literal::new(ScalarValue::Utf8(Some(
2839                    "hello".to_string(),
2840                )))),
2841                alias: "text".to_string(),
2842            },
2843            ProjectionExpr {
2844                expr: Arc::new(Column::new("col0", 0)),
2845                alias: "num".to_string(),
2846            },
2847        ]);
2848
2849        let output_stats = projection.project_statistics(
2850            input_stats,
2851            &projection.project_schema(&input_schema)?,
2852        )?;
2853
2854        // Row count should be preserved
2855        assert_eq!(output_stats.num_rows, Precision::Exact(5));
2856
2857        // Should have 2 column statistics
2858        assert_eq!(output_stats.column_statistics.len(), 2);
2859
2860        // First column (Utf8 literal 'hello') should have proper constant statistics
2861        // but byte_size should be Absent for complex types
2862        assert_eq!(
2863            output_stats.column_statistics[0].min_value,
2864            Precision::Exact(ScalarValue::Utf8(Some("hello".to_string())))
2865        );
2866        assert_eq!(
2867            output_stats.column_statistics[0].max_value,
2868            Precision::Exact(ScalarValue::Utf8(Some("hello".to_string())))
2869        );
2870        assert_eq!(
2871            output_stats.column_statistics[0].distinct_count,
2872            Precision::Exact(1)
2873        );
2874        assert_eq!(
2875            output_stats.column_statistics[0].null_count,
2876            Precision::Exact(0)
2877        );
2878        // Complex types (Utf8, List, etc.) should have byte_size = Absent
2879        // because we can't calculate exact size without knowing the actual data
2880        assert_eq!(
2881            output_stats.column_statistics[0].byte_size,
2882            Precision::Absent
2883        );
2884        // Non-numeric types (Utf8) should have sum_value = Absent
2885        // because sum is only meaningful for numeric types
2886        assert_eq!(
2887            output_stats.column_statistics[0].sum_value,
2888            Precision::Absent
2889        );
2890
2891        // Second column (col0) should preserve statistics
2892        assert_eq!(
2893            output_stats.column_statistics[1].distinct_count,
2894            Precision::Exact(5)
2895        );
2896        assert_eq!(
2897            output_stats.column_statistics[1].max_value,
2898            Precision::Exact(ScalarValue::Int64(Some(21)))
2899        );
2900
2901        Ok(())
2902    }
2903}