datafusion_physical_expr/projection.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ProjectionExpr`] and [`ProjectionExprs`] for representing projections.
19
20use std::ops::Deref;
21use std::sync::Arc;
22
23use crate::PhysicalExpr;
24use crate::expressions::{Column, Literal};
25use crate::utils::collect_columns;
26
27use arrow::array::{RecordBatch, RecordBatchOptions};
28use arrow::datatypes::{Field, Schema, SchemaRef};
29use datafusion_common::stats::{ColumnStatistics, Precision};
30use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
31use datafusion_common::{
32 Result, ScalarValue, Statistics, assert_or_internal_err, internal_datafusion_err,
33 plan_err,
34};
35
36use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet;
37use datafusion_physical_expr_common::metrics::ExpressionEvaluatorMetrics;
38use datafusion_physical_expr_common::physical_expr::fmt_sql;
39use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
40use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays_with_metrics;
41use indexmap::IndexMap;
42use itertools::Itertools;
43
44/// An expression used by projection operations.
45///
46/// The expression is evaluated and the result is stored in a column
47/// with the name specified by `alias`.
48///
49/// For example, the SQL expression `a + b AS sum_ab` would be represented
50/// as a `ProjectionExpr` where `expr` is the expression `a + b`
51/// and `alias` is the string `sum_ab`.
52///
53/// See [`ProjectionExprs`] for a collection of projection expressions.
54#[derive(Debug, Clone)]
55pub struct ProjectionExpr {
56 /// The expression that will be evaluated.
57 pub expr: Arc<dyn PhysicalExpr>,
58 /// The name of the output column for use an output schema.
59 pub alias: String,
60}
61
62impl PartialEq for ProjectionExpr {
63 fn eq(&self, other: &Self) -> bool {
64 let ProjectionExpr { expr, alias } = self;
65 expr.eq(&other.expr) && *alias == other.alias
66 }
67}
68
69impl Eq for ProjectionExpr {}
70
71impl std::fmt::Display for ProjectionExpr {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 if self.expr.to_string() == self.alias {
74 write!(f, "{}", self.alias)
75 } else {
76 write!(f, "{} AS {}", self.expr, self.alias)
77 }
78 }
79}
80
81impl ProjectionExpr {
82 /// Create a new projection expression
83 pub fn new(expr: Arc<dyn PhysicalExpr>, alias: impl Into<String>) -> Self {
84 let alias = alias.into();
85 Self { expr, alias }
86 }
87
88 /// Create a new projection expression from an expression and a schema using the expression's output field name as alias.
89 pub fn new_from_expression(
90 expr: Arc<dyn PhysicalExpr>,
91 schema: &Schema,
92 ) -> Result<Self> {
93 let field = expr.return_field(schema)?;
94 Ok(Self {
95 expr,
96 alias: field.name().to_string(),
97 })
98 }
99}
100
101impl From<(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
102 fn from(value: (Arc<dyn PhysicalExpr>, String)) -> Self {
103 Self::new(value.0, value.1)
104 }
105}
106
107impl From<&(Arc<dyn PhysicalExpr>, String)> for ProjectionExpr {
108 fn from(value: &(Arc<dyn PhysicalExpr>, String)) -> Self {
109 Self::new(Arc::clone(&value.0), value.1.clone())
110 }
111}
112
113impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
114 fn from(value: ProjectionExpr) -> Self {
115 (value.expr, value.alias)
116 }
117}
118
119/// A collection of [`ProjectionExpr`] instances, representing a complete
120/// projection operation.
121///
122/// Projection operations are used in query plans to select specific columns or
123/// compute new columns based on existing ones.
124///
125/// See [`ProjectionExprs::from_indices`] to select a subset of columns by
126/// indices.
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub struct ProjectionExprs {
129 /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance.
130 exprs: Arc<[ProjectionExpr]>,
131}
132
133impl std::fmt::Display for ProjectionExprs {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 let exprs: Vec<String> = self.exprs.iter().map(|e| e.to_string()).collect();
136 write!(f, "Projection[{}]", exprs.join(", "))
137 }
138}
139
140impl From<Vec<ProjectionExpr>> for ProjectionExprs {
141 fn from(value: Vec<ProjectionExpr>) -> Self {
142 Self {
143 exprs: value.into(),
144 }
145 }
146}
147
148impl From<&[ProjectionExpr]> for ProjectionExprs {
149 fn from(value: &[ProjectionExpr]) -> Self {
150 Self {
151 exprs: value.iter().cloned().collect(),
152 }
153 }
154}
155
156impl FromIterator<ProjectionExpr> for ProjectionExprs {
157 fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
158 Self {
159 exprs: exprs.into_iter().collect(),
160 }
161 }
162}
163
164impl AsRef<[ProjectionExpr]> for ProjectionExprs {
165 fn as_ref(&self) -> &[ProjectionExpr] {
166 &self.exprs
167 }
168}
169
170impl ProjectionExprs {
171 /// Make a new [`ProjectionExprs`] from expressions iterator.
172 pub fn new(exprs: impl IntoIterator<Item = ProjectionExpr>) -> Self {
173 Self {
174 exprs: exprs.into_iter().collect(),
175 }
176 }
177
178 /// Make a new [`ProjectionExprs`] from expressions.
179 pub fn from_expressions(exprs: impl Into<Arc<[ProjectionExpr]>>) -> Self {
180 Self {
181 exprs: exprs.into(),
182 }
183 }
184
185 /// Creates a [`ProjectionExpr`] from a list of column indices.
186 ///
187 /// This is a convenience method for creating simple column-only projections, where each projection expression is a reference to a column
188 /// in the input schema.
189 ///
190 /// # Behavior
191 /// - Ordering: the output projection preserves the exact order of indices provided in the input slice
192 /// For example, `[2, 0, 1]` will produce projections for columns 2, 0, then 1 in that order
193 /// - Duplicates: Duplicate indices are allowed and will create multiple projection expressions referencing the same source column
194 /// For example, `[0, 0]` creates 2 separate projections both referencing column 0
195 ///
196 /// # Panics
197 /// Panics if any index in `indices` is out of bounds for the provided schema.
198 ///
199 /// # Example
200 ///
201 /// ```rust
202 /// use arrow::datatypes::{DataType, Field, Schema};
203 /// use datafusion_physical_expr::projection::ProjectionExprs;
204 /// use std::sync::Arc;
205 ///
206 /// // Create a schema with three columns
207 /// let schema = Arc::new(Schema::new(vec![
208 /// Field::new("a", DataType::Int32, false),
209 /// Field::new("b", DataType::Utf8, false),
210 /// Field::new("c", DataType::Float64, false),
211 /// ]));
212 ///
213 /// // Project columns at indices 2 and 0 (c and a) - ordering is preserved
214 /// let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
215 ///
216 /// // This creates: SELECT c@2 AS c, a@0 AS a
217 /// assert_eq!(projection.as_ref().len(), 2);
218 /// assert_eq!(projection.as_ref()[0].alias, "c");
219 /// assert_eq!(projection.as_ref()[1].alias, "a");
220 ///
221 /// // Duplicate indices are allowed
222 /// let projection_with_dups = ProjectionExprs::from_indices(&[0, 0, 1], &schema);
223 /// assert_eq!(projection_with_dups.as_ref().len(), 3);
224 /// assert_eq!(projection_with_dups.as_ref()[0].alias, "a");
225 /// assert_eq!(projection_with_dups.as_ref()[1].alias, "a"); // duplicate
226 /// assert_eq!(projection_with_dups.as_ref()[2].alias, "b");
227 /// ```
228 pub fn from_indices(indices: &[usize], schema: &Schema) -> Self {
229 let projection_exprs = indices.iter().map(|&i| {
230 let field = schema.field(i);
231 ProjectionExpr {
232 expr: Arc::new(Column::new(field.name(), i)),
233 alias: field.name().clone(),
234 }
235 });
236
237 Self::from_iter(projection_exprs)
238 }
239
240 /// Returns an iterator over the projection expressions
241 pub fn iter(&self) -> impl Iterator<Item = &ProjectionExpr> {
242 self.exprs.iter()
243 }
244
245 /// Creates a ProjectionMapping from this projection
246 pub fn projection_mapping(
247 &self,
248 input_schema: &SchemaRef,
249 ) -> Result<ProjectionMapping> {
250 ProjectionMapping::try_new(
251 self.exprs
252 .iter()
253 .map(|p| (Arc::clone(&p.expr), p.alias.clone())),
254 input_schema,
255 )
256 }
257
258 /// Iterate over a clone of the projection expressions.
259 pub fn expr_iter(&self) -> impl Iterator<Item = Arc<dyn PhysicalExpr>> + '_ {
260 self.exprs.iter().map(|e| Arc::clone(&e.expr))
261 }
262
263 /// Apply a fallible transformation to the [`PhysicalExpr`] of each projection.
264 ///
265 /// This method transforms the expression in each [`ProjectionExpr`] while preserving
266 /// the alias. This is useful for rewriting expressions, such as when adapting
267 /// expressions to a different schema.
268 ///
269 /// # Example
270 ///
271 /// ```rust
272 /// use std::sync::Arc;
273 /// use arrow::datatypes::{DataType, Field, Schema};
274 /// use datafusion_common::Result;
275 /// use datafusion_physical_expr::expressions::Column;
276 /// use datafusion_physical_expr::projection::ProjectionExprs;
277 /// use datafusion_physical_expr::PhysicalExpr;
278 ///
279 /// // Create a schema and projection
280 /// let schema = Arc::new(Schema::new(vec![
281 /// Field::new("a", DataType::Int32, false),
282 /// Field::new("b", DataType::Int32, false),
283 /// ]));
284 /// let projection = ProjectionExprs::from_indices(&[0, 1], &schema);
285 ///
286 /// // Transform each expression (this example just clones them)
287 /// let transformed = projection.try_map_exprs(|expr| Ok(expr))?;
288 /// assert_eq!(transformed.as_ref().len(), 2);
289 /// # Ok::<(), datafusion_common::DataFusionError>(())
290 /// ```
291 pub fn try_map_exprs<F>(self, mut f: F) -> Result<Self>
292 where
293 F: FnMut(Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>,
294 {
295 let exprs = self
296 .exprs
297 .iter()
298 .cloned()
299 .map(|mut proj| {
300 proj.expr = f(proj.expr)?;
301 Ok(proj)
302 })
303 .collect::<Result<Arc<_>>>()?;
304 Ok(Self::from_expressions(exprs))
305 }
306
307 /// Apply another projection on top of this projection, returning the combined projection.
308 /// For example, if this projection is `SELECT c@2 AS x, b@1 AS y, a@0 as z` and the other projection is `SELECT x@0 + 1 AS c1, y@1 + z@2 as c2`,
309 /// we return a projection equivalent to `SELECT c@2 + 1 AS c1, b@1 + a@0 as c2`.
310 ///
311 /// # Example
312 ///
313 /// ```rust
314 /// use datafusion_common::{Result, ScalarValue};
315 /// use datafusion_expr::Operator;
316 /// use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
317 /// use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs};
318 /// use std::sync::Arc;
319 ///
320 /// fn main() -> Result<()> {
321 /// // Example from the docstring:
322 /// // Base projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
323 /// let base = ProjectionExprs::new(vec![
324 /// ProjectionExpr {
325 /// expr: Arc::new(Column::new("c", 2)),
326 /// alias: "x".to_string(),
327 /// },
328 /// ProjectionExpr {
329 /// expr: Arc::new(Column::new("b", 1)),
330 /// alias: "y".to_string(),
331 /// },
332 /// ProjectionExpr {
333 /// expr: Arc::new(Column::new("a", 0)),
334 /// alias: "z".to_string(),
335 /// },
336 /// ]);
337 ///
338 /// // Top projection: SELECT x@0 + 1 AS c1, y@1 + z@2 AS c2
339 /// let top = ProjectionExprs::new(vec![
340 /// ProjectionExpr {
341 /// expr: Arc::new(BinaryExpr::new(
342 /// Arc::new(Column::new("x", 0)),
343 /// Operator::Plus,
344 /// Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
345 /// )),
346 /// alias: "c1".to_string(),
347 /// },
348 /// ProjectionExpr {
349 /// expr: Arc::new(BinaryExpr::new(
350 /// Arc::new(Column::new("y", 1)),
351 /// Operator::Plus,
352 /// Arc::new(Column::new("z", 2)),
353 /// )),
354 /// alias: "c2".to_string(),
355 /// },
356 /// ]);
357 ///
358 /// // Expected result: SELECT c@2 + 1 AS c1, b@1 + a@0 AS c2
359 /// let result = base.try_merge(&top)?;
360 ///
361 /// assert_eq!(result.as_ref().len(), 2);
362 /// assert_eq!(result.as_ref()[0].alias, "c1");
363 /// assert_eq!(result.as_ref()[1].alias, "c2");
364 ///
365 /// Ok(())
366 /// }
367 /// ```
368 ///
369 /// # Errors
370 /// This function returns an error if any expression in the `other` projection cannot be
371 /// applied on top of this projection.
372 pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
373 let mut new_exprs = Vec::with_capacity(other.exprs.len());
374 for proj_expr in other.exprs.iter() {
375 new_exprs.push(ProjectionExpr {
376 expr: self.unproject_expr(&proj_expr.expr)?,
377 alias: proj_expr.alias.clone(),
378 });
379 }
380 Ok(ProjectionExprs::new(new_exprs))
381 }
382
383 /// Extract the column indices used in this projection.
384 /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
385 /// this function would return `[0, 1]`.
386 /// Repeated indices are returned only once, and the order is ascending.
387 pub fn column_indices(&self) -> Vec<usize> {
388 self.exprs
389 .iter()
390 .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index()))
391 .sorted_unstable()
392 .dedup()
393 .collect_vec()
394 }
395
396 /// Extract the ordered column indices for a column-only projection.
397 ///
398 /// This function assumes that all expressions in the projection are simple column references.
399 /// It returns the column indices in the order they appear in the projection.
400 ///
401 /// # Panics
402 ///
403 /// Panics if any expression in the projection is not a simple column reference. This includes:
404 /// - Computed expressions (e.g., `a + 1`, `CAST(a AS INT)`)
405 /// - Function calls (e.g., `UPPER(name)`, `SUM(amount)`)
406 /// - Literals (e.g., `42`, `'hello'`)
407 /// - Complex nested expressions (e.g., `CASE WHEN ... THEN ... END`)
408 ///
409 /// # Returns
410 ///
411 /// A vector of column indices in projection order. Unlike [`column_indices()`](Self::column_indices),
412 /// this function:
413 /// - Preserves the projection order (does not sort)
414 /// - Preserves duplicates (does not deduplicate)
415 ///
416 /// # Example
417 ///
418 /// For a projection `SELECT c, a, c` where `a` is at index 0 and `c` is at index 2,
419 /// this function would return `[2, 0, 2]`.
420 ///
421 /// Use [`column_indices()`](Self::column_indices) instead if the projection may contain
422 /// non-column expressions or if you need a deduplicated sorted list.
423 ///
424 /// # Panics
425 ///
426 /// Panics if any expression in the projection is not a simple column reference.
427 #[deprecated(
428 since = "52.0.0",
429 note = "Use column_indices() instead. This method will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
430 )]
431 pub fn ordered_column_indices(&self) -> Vec<usize> {
432 self.exprs
433 .iter()
434 .map(|e| {
435 e.expr
436 .as_any()
437 .downcast_ref::<Column>()
438 .expect("Expected column reference in projection")
439 .index()
440 })
441 .collect()
442 }
443
444 /// Project a schema according to this projection.
445 ///
446 /// For example, given a projection:
447 /// * `SELECT a AS x, b + 1 AS y`
448 /// * where `a` is at index 0
449 /// * `b` is at index 1
450 ///
451 /// If the input schema is `[a: Int32, b: Int32, c: Int32]`, the output
452 /// schema would be `[x: Int32, y: Int32]`.
453 ///
454 /// Note that [`Field`] metadata are preserved from the input schema.
455 pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> {
456 let fields: Result<Vec<Field>> = self
457 .exprs
458 .iter()
459 .map(|proj_expr| {
460 let metadata = proj_expr
461 .expr
462 .return_field(input_schema)?
463 .metadata()
464 .clone();
465
466 let field = Field::new(
467 &proj_expr.alias,
468 proj_expr.expr.data_type(input_schema)?,
469 proj_expr.expr.nullable(input_schema)?,
470 )
471 .with_metadata(metadata);
472
473 Ok(field)
474 })
475 .collect();
476
477 Ok(Schema::new_with_metadata(
478 fields?,
479 input_schema.metadata().clone(),
480 ))
481 }
482
483 /// "unproject" an expression by applying this projection in reverse,
484 /// returning a new set of expressions that reference the original input
485 /// columns.
486 ///
487 /// For example, consider
488 /// * an expression `c1_c2 > 5`, and a schema `[c1, c2]`
489 /// * a projection `c1 + c2 as c1_c2`
490 ///
491 /// This method would rewrite the expression to `c1 + c2 > 5`
492 pub fn unproject_expr(
493 &self,
494 expr: &Arc<dyn PhysicalExpr>,
495 ) -> Result<Arc<dyn PhysicalExpr>> {
496 update_expr(expr, &self.exprs, true)?.ok_or_else(|| {
497 internal_datafusion_err!(
498 "Failed to unproject an expression {} with ProjectionExprs {}",
499 expr,
500 self.exprs.iter().map(|e| format!("{e}")).join(", ")
501 )
502 })
503 }
504
505 /// "project" an expression using these projection's expressions
506 ///
507 /// For example, consider
508 /// * an expression `c1 + c2 > 5`, and a schema `[c1, c2]`
509 /// * a projection `c1 + c2 as c1_c2`
510 ///
511 /// * This method would rewrite the expression to `c1_c2 > 5`
512 pub fn project_expr(
513 &self,
514 expr: &Arc<dyn PhysicalExpr>,
515 ) -> Result<Arc<dyn PhysicalExpr>> {
516 update_expr(expr, &self.exprs, false)?.ok_or_else(|| {
517 internal_datafusion_err!(
518 "Failed to project an expression {} with ProjectionExprs {}",
519 expr,
520 self.exprs.iter().map(|e| format!("{e}")).join(", ")
521 )
522 })
523 }
524
525 /// Create a new [`Projector`] from this projection and an input schema.
526 ///
527 /// A [`Projector`] can be used to apply this projection to record batches.
528 ///
529 /// # Errors
530 /// This function returns an error if the output schema cannot be constructed from the input schema
531 /// with the given projection expressions.
532 /// For example, if an expression only works with integer columns but the input schema has a string column at that index.
533 pub fn make_projector(&self, input_schema: &Schema) -> Result<Projector> {
534 let output_schema = Arc::new(self.project_schema(input_schema)?);
535 Ok(Projector {
536 projection: self.clone(),
537 output_schema,
538 expression_metrics: None,
539 })
540 }
541
542 pub fn create_expression_metrics(
543 &self,
544 metrics: &ExecutionPlanMetricsSet,
545 partition: usize,
546 ) -> ExpressionEvaluatorMetrics {
547 let labels: Vec<String> = self
548 .exprs
549 .iter()
550 .map(|proj_expr| {
551 let expr_sql = fmt_sql(proj_expr.expr.as_ref()).to_string();
552 if proj_expr.expr.to_string() == proj_expr.alias {
553 expr_sql
554 } else {
555 format!("{expr_sql} AS {}", proj_expr.alias)
556 }
557 })
558 .collect();
559 ExpressionEvaluatorMetrics::new(metrics, partition, labels)
560 }
561
562 /// Project statistics according to this projection.
563 /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
564 /// if the input statistics has column statistics for columns `a`, `b`, and `c`, the output statistics would have column statistics for columns `x` and `y`.
565 ///
566 /// # Example
567 ///
568 /// ```rust
569 /// use arrow::datatypes::{DataType, Field, Schema};
570 /// use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
571 /// use datafusion_physical_expr::projection::ProjectionExprs;
572 /// use datafusion_common::Result;
573 /// use datafusion_common::ScalarValue;
574 /// use std::sync::Arc;
575 ///
576 /// fn main() -> Result<()> {
577 /// // Input schema: a: Int32, b: Int32, c: Int32
578 /// let input_schema = Arc::new(Schema::new(vec![
579 /// Field::new("a", DataType::Int32, false),
580 /// Field::new("b", DataType::Int32, false),
581 /// Field::new("c", DataType::Int32, false),
582 /// ]));
583 ///
584 /// // Input statistics with column stats for a, b, c
585 /// let input_stats = Statistics {
586 /// num_rows: Precision::Exact(100),
587 /// total_byte_size: Precision::Exact(1200),
588 /// column_statistics: vec![
589 /// // Column a stats
590 /// ColumnStatistics::new_unknown()
591 /// .with_null_count(Precision::Exact(0))
592 /// .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
593 /// .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
594 /// .with_distinct_count(Precision::Exact(100)),
595 /// // Column b stats
596 /// ColumnStatistics::new_unknown()
597 /// .with_null_count(Precision::Exact(0))
598 /// .with_min_value(Precision::Exact(ScalarValue::Int32(Some(10))))
599 /// .with_max_value(Precision::Exact(ScalarValue::Int32(Some(60))))
600 /// .with_distinct_count(Precision::Exact(50)),
601 /// // Column c stats
602 /// ColumnStatistics::new_unknown()
603 /// .with_null_count(Precision::Exact(5))
604 /// .with_min_value(Precision::Exact(ScalarValue::Int32(Some(-10))))
605 /// .with_max_value(Precision::Exact(ScalarValue::Int32(Some(200))))
606 /// .with_distinct_count(Precision::Exact(25)),
607 /// ],
608 /// };
609 ///
610 /// // Create a projection that selects columns c and a (indices 2 and 0)
611 /// let projection = ProjectionExprs::from_indices(&[2, 0], &input_schema);
612 ///
613 /// // Compute output schema
614 /// let output_schema = projection.project_schema(&input_schema)?;
615 ///
616 /// // Project the statistics
617 /// let output_stats = projection.project_statistics(input_stats, &output_schema)?;
618 ///
619 /// // The output should have 2 column statistics (for c and a, in that order)
620 /// assert_eq!(output_stats.column_statistics.len(), 2);
621 ///
622 /// // First column in output is c (was at index 2)
623 /// assert_eq!(
624 /// output_stats.column_statistics[0].min_value,
625 /// Precision::Exact(ScalarValue::Int32(Some(-10)))
626 /// );
627 /// assert_eq!(
628 /// output_stats.column_statistics[0].null_count,
629 /// Precision::Exact(5)
630 /// );
631 ///
632 /// // Second column in output is a (was at index 0)
633 /// assert_eq!(
634 /// output_stats.column_statistics[1].min_value,
635 /// Precision::Exact(ScalarValue::Int32(Some(0)))
636 /// );
637 /// assert_eq!(
638 /// output_stats.column_statistics[1].distinct_count,
639 /// Precision::Exact(100)
640 /// );
641 ///
642 /// // Total byte size is recalculated based on projected columns
643 /// assert_eq!(
644 /// output_stats.total_byte_size,
645 /// Precision::Exact(800), // each Int32 column is 4 bytes * 100 rows * 2 columns
646 /// );
647 ///
648 /// // Number of rows remains the same
649 /// assert_eq!(output_stats.num_rows, Precision::Exact(100));
650 ///
651 /// Ok(())
652 /// }
653 /// ```
654 pub fn project_statistics(
655 &self,
656 mut stats: Statistics,
657 output_schema: &Schema,
658 ) -> Result<Statistics> {
659 let mut column_statistics = vec![];
660
661 for proj_expr in self.exprs.iter() {
662 let expr = &proj_expr.expr;
663 let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
664 std::mem::take(&mut stats.column_statistics[col.index()])
665 } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
666 // Handle literal expressions (constants) by calculating proper statistics
667 let data_type = expr.data_type(output_schema)?;
668
669 if literal.value().is_null() {
670 let null_count = match stats.num_rows {
671 Precision::Exact(num_rows) => Precision::Exact(num_rows),
672 _ => Precision::Absent,
673 };
674
675 ColumnStatistics {
676 min_value: Precision::Exact(literal.value().clone()),
677 max_value: Precision::Exact(literal.value().clone()),
678 distinct_count: Precision::Exact(1),
679 null_count,
680 sum_value: Precision::Exact(literal.value().clone()),
681 byte_size: Precision::Exact(0),
682 }
683 } else {
684 let value = literal.value();
685 let distinct_count = Precision::Exact(1);
686 let null_count = Precision::Exact(0);
687
688 let byte_size = if let Some(byte_width) = data_type.primitive_width()
689 {
690 stats.num_rows.multiply(&Precision::Exact(byte_width))
691 } else {
692 // Complex types depend on array encoding, so set to Absent
693 Precision::Absent
694 };
695
696 let sum_value = Precision::<ScalarValue>::from(stats.num_rows)
697 .cast_to(&value.data_type())
698 .ok()
699 .map(|row_count| {
700 Precision::Exact(value.clone()).multiply(&row_count)
701 })
702 .unwrap_or(Precision::Absent);
703
704 ColumnStatistics {
705 min_value: Precision::Exact(value.clone()),
706 max_value: Precision::Exact(value.clone()),
707 distinct_count,
708 null_count,
709 sum_value,
710 byte_size,
711 }
712 }
713 } else {
714 // TODO stats: estimate more statistics from expressions
715 // (expressions should compute their statistics themselves)
716 ColumnStatistics::new_unknown()
717 };
718 column_statistics.push(col_stats);
719 }
720 stats.calculate_total_byte_size(output_schema);
721 stats.column_statistics = column_statistics;
722 Ok(stats)
723 }
724}
725
726impl<'a> IntoIterator for &'a ProjectionExprs {
727 type Item = &'a ProjectionExpr;
728 type IntoIter = std::slice::Iter<'a, ProjectionExpr>;
729
730 fn into_iter(self) -> Self::IntoIter {
731 self.exprs.iter()
732 }
733}
734
735/// Applies a projection to record batches.
736///
737/// A [`Projector`] uses a set of projection expressions to transform
738/// and a pre-computed output schema to project record batches accordingly.
739///
740/// The main reason to use a `Projector` is to avoid repeatedly computing
741/// the output schema for each batch, which can be costly if the projection
742/// expressions are complex.
743#[derive(Clone, Debug)]
744pub struct Projector {
745 projection: ProjectionExprs,
746 output_schema: SchemaRef,
747 /// If `Some`, metrics will be tracked for projection evaluation.
748 expression_metrics: Option<ExpressionEvaluatorMetrics>,
749}
750
751impl Projector {
752 /// Construct the projector with metrics. After execution, related metrics will
753 /// be tracked inside `ExecutionPlanMetricsSet`
754 ///
755 /// See [`ExpressionEvaluatorMetrics`] for details.
756 pub fn with_metrics(
757 &self,
758 metrics: &ExecutionPlanMetricsSet,
759 partition: usize,
760 ) -> Self {
761 let expr_metrics = self
762 .projection
763 .create_expression_metrics(metrics, partition);
764 Self {
765 expression_metrics: Some(expr_metrics),
766 projection: self.projection.clone(),
767 output_schema: Arc::clone(&self.output_schema),
768 }
769 }
770
771 /// Project a record batch according to this projector's expressions.
772 ///
773 /// # Errors
774 /// This function returns an error if any expression evaluation fails
775 /// or if the output schema of the resulting record batch does not match
776 /// the pre-computed output schema of the projector.
777 pub fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
778 let arrays = evaluate_expressions_to_arrays_with_metrics(
779 self.projection.exprs.iter().map(|p| &p.expr),
780 batch,
781 self.expression_metrics.as_ref(),
782 )?;
783
784 if arrays.is_empty() {
785 let options =
786 RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
787 RecordBatch::try_new_with_options(
788 Arc::clone(&self.output_schema),
789 arrays,
790 &options,
791 )
792 .map_err(Into::into)
793 } else {
794 RecordBatch::try_new(Arc::clone(&self.output_schema), arrays)
795 .map_err(Into::into)
796 }
797 }
798
799 pub fn output_schema(&self) -> &SchemaRef {
800 &self.output_schema
801 }
802
803 pub fn projection(&self) -> &ProjectionExprs {
804 &self.projection
805 }
806}
807
808/// Describes an immutable reference counted projection.
809///
810/// This structure represents projecting a set of columns by index.
811/// [`Arc`] is used to make it cheap to clone.
812pub type ProjectionRef = Arc<[usize]>;
813
814/// Combine two projections.
815///
816/// If `p1` is [`None`] then there are no changes.
817/// Otherwise, if passed `p2` is not [`None`] then it is remapped
818/// according to the `p1`. Otherwise, there are no changes.
819///
820/// # Example
821///
822/// If stored projection is [0, 2] and we call `apply_projection([0, 2, 3])`,
823/// then the resulting projection will be [0, 3].
824///
825/// # Error
826///
827/// Returns an internal error if `p1` contains index that is greater than `p2` len.
828///
829pub fn combine_projections(
830 p1: Option<&ProjectionRef>,
831 p2: Option<&ProjectionRef>,
832) -> Result<Option<ProjectionRef>> {
833 let Some(p1) = p1 else {
834 return Ok(None);
835 };
836 let Some(p2) = p2 else {
837 return Ok(Some(Arc::clone(p1)));
838 };
839
840 Ok(Some(
841 p1.iter()
842 .map(|i| {
843 let idx = *i;
844 assert_or_internal_err!(
845 idx < p2.len(),
846 "unable to apply projection: index {} is greater than new projection len {}",
847 idx,
848 p2.len(),
849 );
850 Ok(p2[*i])
851 })
852 .collect::<Result<Arc<[usize]>>>()?,
853 ))
854}
855
856/// The function projects / unprojects an expression with respect to set of
857/// projection expressions.
858///
859/// See also [`ProjectionExprs::unproject_expr`] and [`ProjectionExprs::project_expr`]
860///
861/// 1) When `unproject` is `true`:
862///
863/// Rewrites an expression with respect to the projection expressions,
864/// effectively "unprojecting" it to reference the original input columns.
865///
866/// For example, given
867/// * the expressions `a@1 + b@2` and `c@0`
868/// * and projection expressions `c@2, a@0, b@1`
869///
870/// Then
871/// * `a@1 + b@2` becomes `a@0 + b@1`
872/// * `c@0` becomes `c@2`
873///
874/// 2) When `unproject` is `false`:
875///
876/// Rewrites the expression to reference the projected expressions,
877/// effectively "projecting" it. The resulting expression will reference the
878/// indices as they appear in the projection.
879///
880/// If the expression cannot be rewritten after the projection, it returns
881/// `None`.
882///
883/// For example, given
884/// * the expressions `c@0`, `a@1` and `b@2`
885/// * the projection `a@1 as a, c@0 as c_new`,
886///
887/// Then
888/// * `c@0` becomes `c_new@1`
889/// * `a@1` becomes `a@0`
890/// * `b@2` results in `None` since the projection does not include `b`.
891///
892/// # Errors
893/// This function returns an error if `unproject` is `true` and if any expression references
894/// an index that is out of bounds for `projected_exprs`.
895/// For example:
896///
897/// - `expr` is `a@3`
898/// - `projected_exprs` is \[`a@0`, `b@1`\]
899///
900/// In this case, `a@3` references index 3, which is out of bounds for `projected_exprs` (which has length 2).
901pub fn update_expr(
902 expr: &Arc<dyn PhysicalExpr>,
903 projected_exprs: &[ProjectionExpr],
904 unproject: bool,
905) -> Result<Option<Arc<dyn PhysicalExpr>>> {
906 #[derive(Debug, PartialEq)]
907 enum RewriteState {
908 /// The expression is unchanged.
909 Unchanged,
910 /// Some part of the expression has been rewritten
911 RewrittenValid,
912 /// Some part of the expression has been rewritten, but some column
913 /// references could not be.
914 RewrittenInvalid,
915 }
916
917 let mut state = RewriteState::Unchanged;
918
919 let new_expr = Arc::clone(expr)
920 .transform_up(|expr| {
921 if state == RewriteState::RewrittenInvalid {
922 return Ok(Transformed::no(expr));
923 }
924
925 let Some(column) = expr.as_any().downcast_ref::<Column>() else {
926 return Ok(Transformed::no(expr));
927 };
928 if unproject {
929 state = RewriteState::RewrittenValid;
930 // Update the index of `column`:
931 let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {
932 internal_datafusion_err!(
933 "Column index {} out of bounds for projected expressions of length {}",
934 column.index(),
935 projected_exprs.len()
936 )
937 })?;
938 Ok(Transformed::yes(Arc::clone(&projected_expr.expr)))
939 } else {
940 // default to invalid, in case we can't find the relevant column
941 state = RewriteState::RewrittenInvalid;
942 // Determine how to update `column` to accommodate `projected_exprs`
943 projected_exprs
944 .iter()
945 .enumerate()
946 .find_map(|(index, proj_expr)| {
947 proj_expr.expr.as_any().downcast_ref::<Column>().and_then(
948 |projected_column| {
949 (column.name().eq(projected_column.name())
950 && column.index() == projected_column.index())
951 .then(|| {
952 state = RewriteState::RewrittenValid;
953 Arc::new(Column::new(&proj_expr.alias, index)) as _
954 })
955 },
956 )
957 })
958 .map_or_else(
959 || Ok(Transformed::no(expr)),
960 |c| Ok(Transformed::yes(c)),
961 )
962 }
963 })
964 .data()?;
965
966 match state {
967 RewriteState::RewrittenInvalid => Ok(None),
968 // Both Unchanged and RewrittenValid are valid:
969 // - Unchanged means no columns to rewrite (e.g., literals)
970 // - RewrittenValid means columns were successfully rewritten
971 RewriteState::Unchanged | RewriteState::RewrittenValid => Ok(Some(new_expr)),
972 }
973}
974
975/// Stores target expressions, along with their indices, that associate with a
976/// source expression in a projection mapping.
977#[derive(Clone, Debug, Default)]
978pub struct ProjectionTargets {
979 /// A non-empty vector of pairs of target expressions and their indices.
980 /// Consider using a special non-empty collection type in the future (e.g.
981 /// if Rust provides one in the standard library).
982 exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
983}
984
985impl ProjectionTargets {
986 /// Returns the first target expression and its index.
987 pub fn first(&self) -> &(Arc<dyn PhysicalExpr>, usize) {
988 // Since the vector is non-empty, we can safely unwrap:
989 self.exprs_indices.first().unwrap()
990 }
991
992 /// Adds a target expression and its index to the list of targets.
993 pub fn push(&mut self, target: (Arc<dyn PhysicalExpr>, usize)) {
994 self.exprs_indices.push(target);
995 }
996}
997
998impl Deref for ProjectionTargets {
999 type Target = [(Arc<dyn PhysicalExpr>, usize)];
1000
1001 fn deref(&self) -> &Self::Target {
1002 &self.exprs_indices
1003 }
1004}
1005
1006impl From<Vec<(Arc<dyn PhysicalExpr>, usize)>> for ProjectionTargets {
1007 fn from(exprs_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>) -> Self {
1008 Self { exprs_indices }
1009 }
1010}
1011
1012/// Stores the mapping between source expressions and target expressions for a
1013/// projection.
1014#[derive(Clone, Debug)]
1015pub struct ProjectionMapping {
1016 /// Mapping between source expressions and target expressions.
1017 /// Vector indices correspond to the indices after projection.
1018 map: IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>,
1019}
1020
1021impl ProjectionMapping {
1022 /// Constructs the mapping between a projection's input and output
1023 /// expressions.
1024 ///
1025 /// For example, given the input projection expressions (`a + b`, `c + d`)
1026 /// and an output schema with two columns `"c + d"` and `"a + b"`, the
1027 /// projection mapping would be:
1028 ///
1029 /// ```text
1030 /// [0]: (c + d, [(col("c + d"), 0)])
1031 /// [1]: (a + b, [(col("a + b"), 1)])
1032 /// ```
1033 ///
1034 /// where `col("c + d")` means the column named `"c + d"`.
1035 pub fn try_new(
1036 expr: impl IntoIterator<Item = (Arc<dyn PhysicalExpr>, String)>,
1037 input_schema: &SchemaRef,
1038 ) -> Result<Self> {
1039 // Construct a map from the input expressions to the output expression of the projection:
1040 let mut map = IndexMap::<_, ProjectionTargets>::new();
1041 for (expr_idx, (expr, name)) in expr.into_iter().enumerate() {
1042 let target_expr = Arc::new(Column::new(&name, expr_idx)) as _;
1043 let source_expr = expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
1044 Some(col) => {
1045 // Sometimes, an expression and its name in the input_schema
1046 // doesn't match. This can cause problems, so we make sure
1047 // that the expression name matches with the name in `input_schema`.
1048 // Conceptually, `source_expr` and `expression` should be the same.
1049 let idx = col.index();
1050 let matching_field = input_schema.field(idx);
1051 let matching_name = matching_field.name();
1052 assert_or_internal_err!(
1053 col.name() == matching_name,
1054 "Input field name {matching_name} does not match with the projection expression {}",
1055 col.name()
1056 );
1057 let matching_column = Column::new(matching_name, idx);
1058 Ok(Transformed::yes(Arc::new(matching_column)))
1059 }
1060 None => Ok(Transformed::no(e)),
1061 })
1062 .data()?;
1063 map.entry(source_expr)
1064 .or_default()
1065 .push((target_expr, expr_idx));
1066 }
1067 Ok(Self { map })
1068 }
1069
1070 /// Constructs a subset mapping using the provided indices.
1071 ///
1072 /// This is used when the output is a subset of the input without any
1073 /// other transformations. The indices are for columns in the schema.
1074 pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Result<Self> {
1075 let projection_exprs = indices.iter().map(|index| {
1076 let field = schema.field(*index);
1077 let column = Arc::new(Column::new(field.name(), *index));
1078 (column as _, field.name().clone())
1079 });
1080 ProjectionMapping::try_new(projection_exprs, schema)
1081 }
1082}
1083
1084impl Deref for ProjectionMapping {
1085 type Target = IndexMap<Arc<dyn PhysicalExpr>, ProjectionTargets>;
1086
1087 fn deref(&self) -> &Self::Target {
1088 &self.map
1089 }
1090}
1091
1092impl FromIterator<(Arc<dyn PhysicalExpr>, ProjectionTargets)> for ProjectionMapping {
1093 fn from_iter<T: IntoIterator<Item = (Arc<dyn PhysicalExpr>, ProjectionTargets)>>(
1094 iter: T,
1095 ) -> Self {
1096 Self {
1097 map: IndexMap::from_iter(iter),
1098 }
1099 }
1100}
1101
1102/// Projects a slice of [LexOrdering]s onto the given schema.
1103///
1104/// This is a convenience wrapper that applies [project_ordering] to each
1105/// input ordering and collects the successful projections:
1106/// - For each input ordering, the result of [project_ordering] is appended to
1107/// the output if it is `Some(...)`.
1108/// - Order is preserved and no deduplication is attempted.
1109/// - If none of the input orderings can be projected, an empty `Vec` is
1110/// returned.
1111///
1112/// See [project_ordering] for the semantics of projecting a single
1113/// [LexOrdering].
1114pub fn project_orderings(
1115 orderings: &[LexOrdering],
1116 schema: &SchemaRef,
1117) -> Vec<LexOrdering> {
1118 let mut projected_orderings = vec![];
1119
1120 for ordering in orderings {
1121 projected_orderings.extend(project_ordering(ordering, schema));
1122 }
1123
1124 projected_orderings
1125}
1126
1127/// Projects a single [LexOrdering] onto the given schema.
1128///
1129/// This function attempts to rewrite every [PhysicalSortExpr] in the provided
1130/// [LexOrdering] so that any [Column] expressions point at the correct field
1131/// indices in `schema`.
1132///
1133/// Key details:
1134/// - Columns are matched by name, not by index. The index of each matched
1135/// column is looked up with [Schema::column_with_name](arrow::datatypes::Schema::column_with_name) and a new
1136/// [Column] with the correct [index](Column::index) is substituted.
1137/// - If an expression references a column name that does not exist in
1138/// `schema`, projection of the current ordering stops and only the already
1139/// rewritten prefix is kept. This models the fact that a lexicographical
1140/// ordering remains valid for any leading prefix whose expressions are
1141/// present in the projected schema.
1142/// - If no expressions can be projected (i.e. the first one is missing), the
1143/// function returns `None`.
1144///
1145/// Return value:
1146/// - `Some(LexOrdering)` if at least one sort expression could be projected.
1147/// The returned ordering may be a strict prefix of the input ordering.
1148/// - `None` if no part of the ordering can be projected onto `schema`.
1149///
1150/// Example
1151///
1152/// Suppose we have an input ordering `[col("a@0"), col("b@1")]` but the projected
1153/// schema only contains b and not a. The result will be `Some([col("a@0")])`. In other
1154/// words, the column reference is reindexed to match the projected schema.
1155/// If neither a nor b is present, the result will be None.
1156pub fn project_ordering(
1157 ordering: &LexOrdering,
1158 schema: &SchemaRef,
1159) -> Option<LexOrdering> {
1160 let mut projected_exprs = vec![];
1161 for PhysicalSortExpr { expr, options } in ordering.iter() {
1162 let transformed = Arc::clone(expr).transform_up(|expr| {
1163 let Some(col) = expr.as_any().downcast_ref::<Column>() else {
1164 return Ok(Transformed::no(expr));
1165 };
1166
1167 let name = col.name();
1168 if let Some((idx, _)) = schema.column_with_name(name) {
1169 // Compute the new column expression (with correct index) after projection:
1170 Ok(Transformed::yes(Arc::new(Column::new(name, idx))))
1171 } else {
1172 // Cannot find expression in the projected_schema,
1173 // signal this using an Err result
1174 plan_err!("")
1175 }
1176 });
1177
1178 match transformed {
1179 Ok(transformed) => {
1180 projected_exprs.push(PhysicalSortExpr::new(transformed.data, *options));
1181 }
1182 Err(_) => {
1183 // Err result indicates an expression could not be found in the
1184 // projected_schema, stop iterating since rest of the orderings are violated
1185 break;
1186 }
1187 }
1188 }
1189
1190 LexOrdering::new(projected_exprs)
1191}
1192
1193#[cfg(test)]
1194pub(crate) mod tests {
1195 use std::collections::HashMap;
1196
1197 use super::*;
1198 use crate::equivalence::{EquivalenceProperties, convert_to_orderings};
1199 use crate::expressions::{BinaryExpr, Literal, col};
1200 use crate::utils::tests::TestScalarUDF;
1201 use crate::{PhysicalExprRef, ScalarFunctionExpr};
1202
1203 use arrow::compute::SortOptions;
1204 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1205 use datafusion_common::config::ConfigOptions;
1206 use datafusion_common::stats::Precision;
1207 use datafusion_common::{ScalarValue, Statistics};
1208 use datafusion_expr::{Operator, ScalarUDF};
1209 use insta::assert_snapshot;
1210
1211 pub(crate) fn output_schema(
1212 mapping: &ProjectionMapping,
1213 input_schema: &Arc<Schema>,
1214 ) -> Result<SchemaRef> {
1215 // Calculate output schema:
1216 let mut fields = vec![];
1217 for (source, targets) in mapping.iter() {
1218 let data_type = source.data_type(input_schema)?;
1219 let nullable = source.nullable(input_schema)?;
1220 for (target, _) in targets.iter() {
1221 let Some(column) = target.as_any().downcast_ref::<Column>() else {
1222 return plan_err!("Expects to have column");
1223 };
1224 fields.push(Field::new(column.name(), data_type.clone(), nullable));
1225 }
1226 }
1227
1228 let output_schema = Arc::new(Schema::new_with_metadata(
1229 fields,
1230 input_schema.metadata().clone(),
1231 ));
1232
1233 Ok(output_schema)
1234 }
1235
1236 #[test]
1237 fn project_orderings() -> Result<()> {
1238 let schema = Arc::new(Schema::new(vec![
1239 Field::new("a", DataType::Int32, true),
1240 Field::new("b", DataType::Int32, true),
1241 Field::new("c", DataType::Int32, true),
1242 Field::new("d", DataType::Int32, true),
1243 Field::new("e", DataType::Int32, true),
1244 Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1245 ]));
1246 let col_a = &col("a", &schema)?;
1247 let col_b = &col("b", &schema)?;
1248 let col_c = &col("c", &schema)?;
1249 let col_d = &col("d", &schema)?;
1250 let col_e = &col("e", &schema)?;
1251 let col_ts = &col("ts", &schema)?;
1252 let a_plus_b = Arc::new(BinaryExpr::new(
1253 Arc::clone(col_a),
1254 Operator::Plus,
1255 Arc::clone(col_b),
1256 )) as Arc<dyn PhysicalExpr>;
1257 let b_plus_d = Arc::new(BinaryExpr::new(
1258 Arc::clone(col_b),
1259 Operator::Plus,
1260 Arc::clone(col_d),
1261 )) as Arc<dyn PhysicalExpr>;
1262 let b_plus_e = Arc::new(BinaryExpr::new(
1263 Arc::clone(col_b),
1264 Operator::Plus,
1265 Arc::clone(col_e),
1266 )) as Arc<dyn PhysicalExpr>;
1267 let c_plus_d = Arc::new(BinaryExpr::new(
1268 Arc::clone(col_c),
1269 Operator::Plus,
1270 Arc::clone(col_d),
1271 )) as Arc<dyn PhysicalExpr>;
1272
1273 let option_asc = SortOptions {
1274 descending: false,
1275 nulls_first: false,
1276 };
1277 let option_desc = SortOptions {
1278 descending: true,
1279 nulls_first: true,
1280 };
1281
1282 let test_cases = vec![
1283 // ---------- TEST CASE 1 ------------
1284 (
1285 // orderings
1286 vec![
1287 // [b ASC]
1288 vec![(col_b, option_asc)],
1289 ],
1290 // projection exprs
1291 vec![(col_b, "b_new".to_string()), (col_a, "a_new".to_string())],
1292 // expected
1293 vec![
1294 // [b_new ASC]
1295 vec![("b_new", option_asc)],
1296 ],
1297 ),
1298 // ---------- TEST CASE 2 ------------
1299 (
1300 // orderings
1301 vec![
1302 // empty ordering
1303 ],
1304 // projection exprs
1305 vec![(col_c, "c_new".to_string()), (col_b, "b_new".to_string())],
1306 // expected
1307 vec![
1308 // no ordering at the output
1309 ],
1310 ),
1311 // ---------- TEST CASE 3 ------------
1312 (
1313 // orderings
1314 vec![
1315 // [ts ASC]
1316 vec![(col_ts, option_asc)],
1317 ],
1318 // projection exprs
1319 vec![
1320 (col_b, "b_new".to_string()),
1321 (col_a, "a_new".to_string()),
1322 (col_ts, "ts_new".to_string()),
1323 ],
1324 // expected
1325 vec![
1326 // [ts_new ASC]
1327 vec![("ts_new", option_asc)],
1328 ],
1329 ),
1330 // ---------- TEST CASE 4 ------------
1331 (
1332 // orderings
1333 vec![
1334 // [a ASC, ts ASC]
1335 vec![(col_a, option_asc), (col_ts, option_asc)],
1336 // [b ASC, ts ASC]
1337 vec![(col_b, option_asc), (col_ts, option_asc)],
1338 ],
1339 // projection exprs
1340 vec![
1341 (col_b, "b_new".to_string()),
1342 (col_a, "a_new".to_string()),
1343 (col_ts, "ts_new".to_string()),
1344 ],
1345 // expected
1346 vec![
1347 // [a_new ASC, ts_new ASC]
1348 vec![("a_new", option_asc), ("ts_new", option_asc)],
1349 // [b_new ASC, ts_new ASC]
1350 vec![("b_new", option_asc), ("ts_new", option_asc)],
1351 ],
1352 ),
1353 // ---------- TEST CASE 5 ------------
1354 (
1355 // orderings
1356 vec![
1357 // [a + b ASC]
1358 vec![(&a_plus_b, option_asc)],
1359 ],
1360 // projection exprs
1361 vec![
1362 (col_b, "b_new".to_string()),
1363 (col_a, "a_new".to_string()),
1364 (&a_plus_b, "a+b".to_string()),
1365 ],
1366 // expected
1367 vec![
1368 // [a + b ASC]
1369 vec![("a+b", option_asc)],
1370 ],
1371 ),
1372 // ---------- TEST CASE 6 ------------
1373 (
1374 // orderings
1375 vec![
1376 // [a + b ASC, c ASC]
1377 vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1378 ],
1379 // projection exprs
1380 vec![
1381 (col_b, "b_new".to_string()),
1382 (col_a, "a_new".to_string()),
1383 (col_c, "c_new".to_string()),
1384 (&a_plus_b, "a+b".to_string()),
1385 ],
1386 // expected
1387 vec![
1388 // [a + b ASC, c_new ASC]
1389 vec![("a+b", option_asc), ("c_new", option_asc)],
1390 ],
1391 ),
1392 // ------- TEST CASE 7 ----------
1393 (
1394 vec![
1395 // [a ASC, b ASC, c ASC]
1396 vec![(col_a, option_asc), (col_b, option_asc)],
1397 // [a ASC, d ASC]
1398 vec![(col_a, option_asc), (col_d, option_asc)],
1399 ],
1400 // b as b_new, a as a_new, d as d_new b+d
1401 vec![
1402 (col_b, "b_new".to_string()),
1403 (col_a, "a_new".to_string()),
1404 (col_d, "d_new".to_string()),
1405 (&b_plus_d, "b+d".to_string()),
1406 ],
1407 // expected
1408 vec![
1409 // [a_new ASC, b_new ASC]
1410 vec![("a_new", option_asc), ("b_new", option_asc)],
1411 // [a_new ASC, d_new ASC]
1412 vec![("a_new", option_asc), ("d_new", option_asc)],
1413 // [a_new ASC, b+d ASC]
1414 vec![("a_new", option_asc), ("b+d", option_asc)],
1415 ],
1416 ),
1417 // ------- TEST CASE 8 ----------
1418 (
1419 // orderings
1420 vec![
1421 // [b+d ASC]
1422 vec![(&b_plus_d, option_asc)],
1423 ],
1424 // proj exprs
1425 vec![
1426 (col_b, "b_new".to_string()),
1427 (col_a, "a_new".to_string()),
1428 (col_d, "d_new".to_string()),
1429 (&b_plus_d, "b+d".to_string()),
1430 ],
1431 // expected
1432 vec![
1433 // [b+d ASC]
1434 vec![("b+d", option_asc)],
1435 ],
1436 ),
1437 // ------- TEST CASE 9 ----------
1438 (
1439 // orderings
1440 vec![
1441 // [a ASC, d ASC, b ASC]
1442 vec![
1443 (col_a, option_asc),
1444 (col_d, option_asc),
1445 (col_b, option_asc),
1446 ],
1447 // [c ASC]
1448 vec![(col_c, option_asc)],
1449 ],
1450 // proj exprs
1451 vec![
1452 (col_b, "b_new".to_string()),
1453 (col_a, "a_new".to_string()),
1454 (col_d, "d_new".to_string()),
1455 (col_c, "c_new".to_string()),
1456 ],
1457 // expected
1458 vec![
1459 // [a_new ASC, d_new ASC, b_new ASC]
1460 vec![
1461 ("a_new", option_asc),
1462 ("d_new", option_asc),
1463 ("b_new", option_asc),
1464 ],
1465 // [c_new ASC],
1466 vec![("c_new", option_asc)],
1467 ],
1468 ),
1469 // ------- TEST CASE 10 ----------
1470 (
1471 vec![
1472 // [a ASC, b ASC, c ASC]
1473 vec![
1474 (col_a, option_asc),
1475 (col_b, option_asc),
1476 (col_c, option_asc),
1477 ],
1478 // [a ASC, d ASC]
1479 vec![(col_a, option_asc), (col_d, option_asc)],
1480 ],
1481 // proj exprs
1482 vec![
1483 (col_b, "b_new".to_string()),
1484 (col_a, "a_new".to_string()),
1485 (col_c, "c_new".to_string()),
1486 (&c_plus_d, "c+d".to_string()),
1487 ],
1488 // expected
1489 vec![
1490 // [a_new ASC, b_new ASC, c_new ASC]
1491 vec![
1492 ("a_new", option_asc),
1493 ("b_new", option_asc),
1494 ("c_new", option_asc),
1495 ],
1496 // [a_new ASC, b_new ASC, c+d ASC]
1497 vec![
1498 ("a_new", option_asc),
1499 ("b_new", option_asc),
1500 ("c+d", option_asc),
1501 ],
1502 ],
1503 ),
1504 // ------- TEST CASE 11 ----------
1505 (
1506 // orderings
1507 vec![
1508 // [a ASC, b ASC]
1509 vec![(col_a, option_asc), (col_b, option_asc)],
1510 // [a ASC, d ASC]
1511 vec![(col_a, option_asc), (col_d, option_asc)],
1512 ],
1513 // proj exprs
1514 vec![
1515 (col_b, "b_new".to_string()),
1516 (col_a, "a_new".to_string()),
1517 (&b_plus_d, "b+d".to_string()),
1518 ],
1519 // expected
1520 vec![
1521 // [a_new ASC, b_new ASC]
1522 vec![("a_new", option_asc), ("b_new", option_asc)],
1523 // [a_new ASC, b + d ASC]
1524 vec![("a_new", option_asc), ("b+d", option_asc)],
1525 ],
1526 ),
1527 // ------- TEST CASE 12 ----------
1528 (
1529 // orderings
1530 vec![
1531 // [a ASC, b ASC, c ASC]
1532 vec![
1533 (col_a, option_asc),
1534 (col_b, option_asc),
1535 (col_c, option_asc),
1536 ],
1537 ],
1538 // proj exprs
1539 vec![(col_c, "c_new".to_string()), (col_a, "a_new".to_string())],
1540 // expected
1541 vec![
1542 // [a_new ASC]
1543 vec![("a_new", option_asc)],
1544 ],
1545 ),
1546 // ------- TEST CASE 13 ----------
1547 (
1548 // orderings
1549 vec![
1550 // [a ASC, b ASC, c ASC]
1551 vec![
1552 (col_a, option_asc),
1553 (col_b, option_asc),
1554 (col_c, option_asc),
1555 ],
1556 // [a ASC, a + b ASC, c ASC]
1557 vec![
1558 (col_a, option_asc),
1559 (&a_plus_b, option_asc),
1560 (col_c, option_asc),
1561 ],
1562 ],
1563 // proj exprs
1564 vec![
1565 (col_c, "c_new".to_string()),
1566 (col_b, "b_new".to_string()),
1567 (col_a, "a_new".to_string()),
1568 (&a_plus_b, "a+b".to_string()),
1569 ],
1570 // expected
1571 vec![
1572 // [a_new ASC, b_new ASC, c_new ASC]
1573 vec![
1574 ("a_new", option_asc),
1575 ("b_new", option_asc),
1576 ("c_new", option_asc),
1577 ],
1578 // [a_new ASC, a+b ASC, c_new ASC]
1579 vec![
1580 ("a_new", option_asc),
1581 ("a+b", option_asc),
1582 ("c_new", option_asc),
1583 ],
1584 ],
1585 ),
1586 // ------- TEST CASE 14 ----------
1587 (
1588 // orderings
1589 vec![
1590 // [a ASC, b ASC]
1591 vec![(col_a, option_asc), (col_b, option_asc)],
1592 // [c ASC, b ASC]
1593 vec![(col_c, option_asc), (col_b, option_asc)],
1594 // [d ASC, e ASC]
1595 vec![(col_d, option_asc), (col_e, option_asc)],
1596 ],
1597 // proj exprs
1598 vec![
1599 (col_c, "c_new".to_string()),
1600 (col_d, "d_new".to_string()),
1601 (col_a, "a_new".to_string()),
1602 (&b_plus_e, "b+e".to_string()),
1603 ],
1604 // expected
1605 vec![
1606 // [a_new ASC, d_new ASC, b+e ASC]
1607 vec![
1608 ("a_new", option_asc),
1609 ("d_new", option_asc),
1610 ("b+e", option_asc),
1611 ],
1612 // [d_new ASC, a_new ASC, b+e ASC]
1613 vec![
1614 ("d_new", option_asc),
1615 ("a_new", option_asc),
1616 ("b+e", option_asc),
1617 ],
1618 // [c_new ASC, d_new ASC, b+e ASC]
1619 vec![
1620 ("c_new", option_asc),
1621 ("d_new", option_asc),
1622 ("b+e", option_asc),
1623 ],
1624 // [d_new ASC, c_new ASC, b+e ASC]
1625 vec![
1626 ("d_new", option_asc),
1627 ("c_new", option_asc),
1628 ("b+e", option_asc),
1629 ],
1630 ],
1631 ),
1632 // ------- TEST CASE 15 ----------
1633 (
1634 // orderings
1635 vec![
1636 // [a ASC, c ASC, b ASC]
1637 vec![
1638 (col_a, option_asc),
1639 (col_c, option_asc),
1640 (col_b, option_asc),
1641 ],
1642 ],
1643 // proj exprs
1644 vec![
1645 (col_c, "c_new".to_string()),
1646 (col_a, "a_new".to_string()),
1647 (&a_plus_b, "a+b".to_string()),
1648 ],
1649 // expected
1650 vec![
1651 // [a_new ASC, d_new ASC, b+e ASC]
1652 vec![
1653 ("a_new", option_asc),
1654 ("c_new", option_asc),
1655 ("a+b", option_asc),
1656 ],
1657 ],
1658 ),
1659 // ------- TEST CASE 16 ----------
1660 (
1661 // orderings
1662 vec![
1663 // [a ASC, b ASC]
1664 vec![(col_a, option_asc), (col_b, option_asc)],
1665 // [c ASC, b DESC]
1666 vec![(col_c, option_asc), (col_b, option_desc)],
1667 // [e ASC]
1668 vec![(col_e, option_asc)],
1669 ],
1670 // proj exprs
1671 vec![
1672 (col_c, "c_new".to_string()),
1673 (col_a, "a_new".to_string()),
1674 (col_b, "b_new".to_string()),
1675 (&b_plus_e, "b+e".to_string()),
1676 ],
1677 // expected
1678 vec![
1679 // [a_new ASC, b_new ASC]
1680 vec![("a_new", option_asc), ("b_new", option_asc)],
1681 // [a_new ASC, b_new ASC]
1682 vec![("a_new", option_asc), ("b+e", option_asc)],
1683 // [c_new ASC, b_new DESC]
1684 vec![("c_new", option_asc), ("b_new", option_desc)],
1685 ],
1686 ),
1687 ];
1688
1689 for (idx, (orderings, proj_exprs, expected)) in test_cases.into_iter().enumerate()
1690 {
1691 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1692
1693 let orderings = convert_to_orderings(&orderings);
1694 eq_properties.add_orderings(orderings);
1695
1696 let proj_exprs = proj_exprs
1697 .into_iter()
1698 .map(|(expr, name)| (Arc::clone(expr), name));
1699 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1700 let output_schema = output_schema(&projection_mapping, &schema)?;
1701
1702 let expected = expected
1703 .into_iter()
1704 .map(|ordering| {
1705 ordering
1706 .into_iter()
1707 .map(|(name, options)| {
1708 (col(name, &output_schema).unwrap(), options)
1709 })
1710 .collect::<Vec<_>>()
1711 })
1712 .collect::<Vec<_>>();
1713 let expected = convert_to_orderings(&expected);
1714
1715 let projected_eq = eq_properties.project(&projection_mapping, output_schema);
1716 let orderings = projected_eq.oeq_class();
1717
1718 let err_msg = format!(
1719 "test_idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1720 );
1721
1722 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1723 for expected_ordering in &expected {
1724 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1725 }
1726 }
1727
1728 Ok(())
1729 }
1730
1731 #[test]
1732 fn project_orderings2() -> Result<()> {
1733 let schema = Arc::new(Schema::new(vec![
1734 Field::new("a", DataType::Int32, true),
1735 Field::new("b", DataType::Int32, true),
1736 Field::new("c", DataType::Int32, true),
1737 Field::new("d", DataType::Int32, true),
1738 Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1739 ]));
1740 let col_a = &col("a", &schema)?;
1741 let col_b = &col("b", &schema)?;
1742 let col_c = &col("c", &schema)?;
1743 let col_ts = &col("ts", &schema)?;
1744 let a_plus_b = Arc::new(BinaryExpr::new(
1745 Arc::clone(col_a),
1746 Operator::Plus,
1747 Arc::clone(col_b),
1748 )) as Arc<dyn PhysicalExpr>;
1749
1750 let test_fun = Arc::new(ScalarUDF::new_from_impl(TestScalarUDF::new()));
1751
1752 let round_c = Arc::new(ScalarFunctionExpr::try_new(
1753 test_fun,
1754 vec![Arc::clone(col_c)],
1755 &schema,
1756 Arc::new(ConfigOptions::default()),
1757 )?) as PhysicalExprRef;
1758
1759 let option_asc = SortOptions {
1760 descending: false,
1761 nulls_first: false,
1762 };
1763
1764 let proj_exprs = vec![
1765 (col_b, "b_new".to_string()),
1766 (col_a, "a_new".to_string()),
1767 (col_c, "c_new".to_string()),
1768 (&round_c, "round_c_res".to_string()),
1769 ];
1770 let proj_exprs = proj_exprs
1771 .into_iter()
1772 .map(|(expr, name)| (Arc::clone(expr), name));
1773 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1774 let output_schema = output_schema(&projection_mapping, &schema)?;
1775
1776 let col_a_new = &col("a_new", &output_schema)?;
1777 let col_b_new = &col("b_new", &output_schema)?;
1778 let col_c_new = &col("c_new", &output_schema)?;
1779 let col_round_c_res = &col("round_c_res", &output_schema)?;
1780 let a_new_plus_b_new = Arc::new(BinaryExpr::new(
1781 Arc::clone(col_a_new),
1782 Operator::Plus,
1783 Arc::clone(col_b_new),
1784 )) as Arc<dyn PhysicalExpr>;
1785
1786 let test_cases = [
1787 // ---------- TEST CASE 1 ------------
1788 (
1789 // orderings
1790 vec![
1791 // [a ASC]
1792 vec![(col_a, option_asc)],
1793 ],
1794 // expected
1795 vec![
1796 // [b_new ASC]
1797 vec![(col_a_new, option_asc)],
1798 ],
1799 ),
1800 // ---------- TEST CASE 2 ------------
1801 (
1802 // orderings
1803 vec![
1804 // [a+b ASC]
1805 vec![(&a_plus_b, option_asc)],
1806 ],
1807 // expected
1808 vec![
1809 // [b_new ASC]
1810 vec![(&a_new_plus_b_new, option_asc)],
1811 ],
1812 ),
1813 // ---------- TEST CASE 3 ------------
1814 (
1815 // orderings
1816 vec![
1817 // [a ASC, ts ASC]
1818 vec![(col_a, option_asc), (col_ts, option_asc)],
1819 ],
1820 // expected
1821 vec![
1822 // [a_new ASC, date_bin_res ASC]
1823 vec![(col_a_new, option_asc)],
1824 ],
1825 ),
1826 // ---------- TEST CASE 4 ------------
1827 (
1828 // orderings
1829 vec![
1830 // [a ASC, ts ASC, b ASC]
1831 vec![
1832 (col_a, option_asc),
1833 (col_ts, option_asc),
1834 (col_b, option_asc),
1835 ],
1836 ],
1837 // expected
1838 vec![
1839 // [a_new ASC, date_bin_res ASC]
1840 vec![(col_a_new, option_asc)],
1841 ],
1842 ),
1843 // ---------- TEST CASE 5 ------------
1844 (
1845 // orderings
1846 vec![
1847 // [a ASC, c ASC]
1848 vec![(col_a, option_asc), (col_c, option_asc)],
1849 ],
1850 // expected
1851 vec![
1852 // [a_new ASC, round_c_res ASC, c_new ASC]
1853 vec![(col_a_new, option_asc), (col_round_c_res, option_asc)],
1854 // [a_new ASC, c_new ASC]
1855 vec![(col_a_new, option_asc), (col_c_new, option_asc)],
1856 ],
1857 ),
1858 // ---------- TEST CASE 6 ------------
1859 (
1860 // orderings
1861 vec![
1862 // [c ASC, b ASC]
1863 vec![(col_c, option_asc), (col_b, option_asc)],
1864 ],
1865 // expected
1866 vec![
1867 // [round_c_res ASC]
1868 vec![(col_round_c_res, option_asc)],
1869 // [c_new ASC, b_new ASC]
1870 vec![(col_c_new, option_asc), (col_b_new, option_asc)],
1871 ],
1872 ),
1873 // ---------- TEST CASE 7 ------------
1874 (
1875 // orderings
1876 vec![
1877 // [a+b ASC, c ASC]
1878 vec![(&a_plus_b, option_asc), (col_c, option_asc)],
1879 ],
1880 // expected
1881 vec![
1882 // [a+b ASC, round(c) ASC, c_new ASC]
1883 vec![
1884 (&a_new_plus_b_new, option_asc),
1885 (col_round_c_res, option_asc),
1886 ],
1887 // [a+b ASC, c_new ASC]
1888 vec![(&a_new_plus_b_new, option_asc), (col_c_new, option_asc)],
1889 ],
1890 ),
1891 ];
1892
1893 for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
1894 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
1895
1896 let orderings = convert_to_orderings(orderings);
1897 eq_properties.add_orderings(orderings);
1898
1899 let expected = convert_to_orderings(expected);
1900
1901 let projected_eq =
1902 eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
1903 let orderings = projected_eq.oeq_class();
1904
1905 let err_msg = format!(
1906 "test idx: {idx:?}, actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
1907 );
1908
1909 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
1910 for expected_ordering in &expected {
1911 assert!(orderings.contains(expected_ordering), "{}", err_msg)
1912 }
1913 }
1914 Ok(())
1915 }
1916
1917 #[test]
1918 fn project_orderings3() -> Result<()> {
1919 let schema = Arc::new(Schema::new(vec![
1920 Field::new("a", DataType::Int32, true),
1921 Field::new("b", DataType::Int32, true),
1922 Field::new("c", DataType::Int32, true),
1923 Field::new("d", DataType::Int32, true),
1924 Field::new("e", DataType::Int32, true),
1925 Field::new("f", DataType::Int32, true),
1926 ]));
1927 let col_a = &col("a", &schema)?;
1928 let col_b = &col("b", &schema)?;
1929 let col_c = &col("c", &schema)?;
1930 let col_d = &col("d", &schema)?;
1931 let col_e = &col("e", &schema)?;
1932 let col_f = &col("f", &schema)?;
1933 let a_plus_b = Arc::new(BinaryExpr::new(
1934 Arc::clone(col_a),
1935 Operator::Plus,
1936 Arc::clone(col_b),
1937 )) as Arc<dyn PhysicalExpr>;
1938
1939 let option_asc = SortOptions {
1940 descending: false,
1941 nulls_first: false,
1942 };
1943
1944 let proj_exprs = vec![
1945 (col_c, "c_new".to_string()),
1946 (col_d, "d_new".to_string()),
1947 (&a_plus_b, "a+b".to_string()),
1948 ];
1949 let proj_exprs = proj_exprs
1950 .into_iter()
1951 .map(|(expr, name)| (Arc::clone(expr), name));
1952 let projection_mapping = ProjectionMapping::try_new(proj_exprs, &schema)?;
1953 let output_schema = output_schema(&projection_mapping, &schema)?;
1954
1955 let col_a_plus_b_new = &col("a+b", &output_schema)?;
1956 let col_c_new = &col("c_new", &output_schema)?;
1957 let col_d_new = &col("d_new", &output_schema)?;
1958
1959 let test_cases = vec![
1960 // ---------- TEST CASE 1 ------------
1961 (
1962 // orderings
1963 vec![
1964 // [d ASC, b ASC]
1965 vec![(col_d, option_asc), (col_b, option_asc)],
1966 // [c ASC, a ASC]
1967 vec![(col_c, option_asc), (col_a, option_asc)],
1968 ],
1969 // equal conditions
1970 vec![],
1971 // expected
1972 vec![
1973 // [d_new ASC, c_new ASC, a+b ASC]
1974 vec![
1975 (col_d_new, option_asc),
1976 (col_c_new, option_asc),
1977 (col_a_plus_b_new, option_asc),
1978 ],
1979 // [c_new ASC, d_new ASC, a+b ASC]
1980 vec![
1981 (col_c_new, option_asc),
1982 (col_d_new, option_asc),
1983 (col_a_plus_b_new, option_asc),
1984 ],
1985 ],
1986 ),
1987 // ---------- TEST CASE 2 ------------
1988 (
1989 // orderings
1990 vec![
1991 // [d ASC, b ASC]
1992 vec![(col_d, option_asc), (col_b, option_asc)],
1993 // [c ASC, e ASC], Please note that a=e
1994 vec![(col_c, option_asc), (col_e, option_asc)],
1995 ],
1996 // equal conditions
1997 vec![(col_e, col_a)],
1998 // expected
1999 vec![
2000 // [d_new ASC, c_new ASC, a+b ASC]
2001 vec![
2002 (col_d_new, option_asc),
2003 (col_c_new, option_asc),
2004 (col_a_plus_b_new, option_asc),
2005 ],
2006 // [c_new ASC, d_new ASC, a+b ASC]
2007 vec![
2008 (col_c_new, option_asc),
2009 (col_d_new, option_asc),
2010 (col_a_plus_b_new, option_asc),
2011 ],
2012 ],
2013 ),
2014 // ---------- TEST CASE 3 ------------
2015 (
2016 // orderings
2017 vec![
2018 // [d ASC, b ASC]
2019 vec![(col_d, option_asc), (col_b, option_asc)],
2020 // [c ASC, e ASC], Please note that a=f
2021 vec![(col_c, option_asc), (col_e, option_asc)],
2022 ],
2023 // equal conditions
2024 vec![(col_a, col_f)],
2025 // expected
2026 vec![
2027 // [d_new ASC]
2028 vec![(col_d_new, option_asc)],
2029 // [c_new ASC]
2030 vec![(col_c_new, option_asc)],
2031 ],
2032 ),
2033 ];
2034 for (orderings, equal_columns, expected) in test_cases {
2035 let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
2036 for (lhs, rhs) in equal_columns {
2037 eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?;
2038 }
2039
2040 let orderings = convert_to_orderings(&orderings);
2041 eq_properties.add_orderings(orderings);
2042
2043 let expected = convert_to_orderings(&expected);
2044
2045 let projected_eq =
2046 eq_properties.project(&projection_mapping, Arc::clone(&output_schema));
2047 let orderings = projected_eq.oeq_class();
2048
2049 let err_msg = format!(
2050 "actual: {orderings:?}, expected: {expected:?}, projection_mapping: {projection_mapping:?}"
2051 );
2052
2053 assert_eq!(orderings.len(), expected.len(), "{err_msg}");
2054 for expected_ordering in &expected {
2055 assert!(orderings.contains(expected_ordering), "{}", err_msg)
2056 }
2057 }
2058
2059 Ok(())
2060 }
2061
2062 fn get_stats() -> Statistics {
2063 Statistics {
2064 num_rows: Precision::Exact(5),
2065 total_byte_size: Precision::Exact(23),
2066 column_statistics: vec![
2067 ColumnStatistics {
2068 distinct_count: Precision::Exact(5),
2069 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
2070 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
2071 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
2072 null_count: Precision::Exact(0),
2073 byte_size: Precision::Absent,
2074 },
2075 ColumnStatistics {
2076 distinct_count: Precision::Exact(1),
2077 max_value: Precision::Exact(ScalarValue::from("x")),
2078 min_value: Precision::Exact(ScalarValue::from("a")),
2079 sum_value: Precision::Absent,
2080 null_count: Precision::Exact(3),
2081 byte_size: Precision::Absent,
2082 },
2083 ColumnStatistics {
2084 distinct_count: Precision::Absent,
2085 max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
2086 min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
2087 sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
2088 null_count: Precision::Absent,
2089 byte_size: Precision::Absent,
2090 },
2091 ],
2092 }
2093 }
2094
2095 fn get_schema() -> Schema {
2096 let field_0 = Field::new("col0", DataType::Int64, false);
2097 let field_1 = Field::new("col1", DataType::Utf8, false);
2098 let field_2 = Field::new("col2", DataType::Float32, false);
2099 Schema::new(vec![field_0, field_1, field_2])
2100 }
2101
2102 #[test]
2103 fn test_stats_projection_columns_only() {
2104 let source = get_stats();
2105 let schema = get_schema();
2106
2107 let projection = ProjectionExprs::new(vec![
2108 ProjectionExpr {
2109 expr: Arc::new(Column::new("col1", 1)),
2110 alias: "col1".to_string(),
2111 },
2112 ProjectionExpr {
2113 expr: Arc::new(Column::new("col0", 0)),
2114 alias: "col0".to_string(),
2115 },
2116 ]);
2117
2118 let result = projection
2119 .project_statistics(source, &projection.project_schema(&schema).unwrap())
2120 .unwrap();
2121
2122 let expected = Statistics {
2123 num_rows: Precision::Exact(5),
2124 // Because there is a variable length Utf8 column we cannot calculate exact byte size after projection
2125 // Thus we set it to Inexact (originally it was Exact(23))
2126 total_byte_size: Precision::Inexact(23),
2127 column_statistics: vec![
2128 ColumnStatistics {
2129 distinct_count: Precision::Exact(1),
2130 max_value: Precision::Exact(ScalarValue::from("x")),
2131 min_value: Precision::Exact(ScalarValue::from("a")),
2132 sum_value: Precision::Absent,
2133 null_count: Precision::Exact(3),
2134 byte_size: Precision::Absent,
2135 },
2136 ColumnStatistics {
2137 distinct_count: Precision::Exact(5),
2138 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
2139 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
2140 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
2141 null_count: Precision::Exact(0),
2142 byte_size: Precision::Absent,
2143 },
2144 ],
2145 };
2146
2147 assert_eq!(result, expected);
2148 }
2149
2150 #[test]
2151 fn test_stats_projection_column_with_primitive_width_only() {
2152 let source = get_stats();
2153 let schema = get_schema();
2154
2155 let projection = ProjectionExprs::new(vec![
2156 ProjectionExpr {
2157 expr: Arc::new(Column::new("col2", 2)),
2158 alias: "col2".to_string(),
2159 },
2160 ProjectionExpr {
2161 expr: Arc::new(Column::new("col0", 0)),
2162 alias: "col0".to_string(),
2163 },
2164 ]);
2165
2166 let result = projection
2167 .project_statistics(source, &projection.project_schema(&schema).unwrap())
2168 .unwrap();
2169
2170 let expected = Statistics {
2171 num_rows: Precision::Exact(5),
2172 total_byte_size: Precision::Exact(60),
2173 column_statistics: vec![
2174 ColumnStatistics {
2175 distinct_count: Precision::Absent,
2176 max_value: Precision::Exact(ScalarValue::Float32(Some(1.1))),
2177 min_value: Precision::Exact(ScalarValue::Float32(Some(0.1))),
2178 sum_value: Precision::Exact(ScalarValue::Float32(Some(5.5))),
2179 null_count: Precision::Absent,
2180 byte_size: Precision::Absent,
2181 },
2182 ColumnStatistics {
2183 distinct_count: Precision::Exact(5),
2184 max_value: Precision::Exact(ScalarValue::Int64(Some(21))),
2185 min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
2186 sum_value: Precision::Exact(ScalarValue::Int64(Some(42))),
2187 null_count: Precision::Exact(0),
2188 byte_size: Precision::Absent,
2189 },
2190 ],
2191 };
2192
2193 assert_eq!(result, expected);
2194 }
2195
2196 // Tests for Projection struct
2197
2198 #[test]
2199 fn test_projection_new() -> Result<()> {
2200 let exprs = vec![
2201 ProjectionExpr {
2202 expr: Arc::new(Column::new("a", 0)),
2203 alias: "a".to_string(),
2204 },
2205 ProjectionExpr {
2206 expr: Arc::new(Column::new("b", 1)),
2207 alias: "b".to_string(),
2208 },
2209 ];
2210 let projection = ProjectionExprs::new(exprs.clone());
2211 assert_eq!(projection.as_ref().len(), 2);
2212 Ok(())
2213 }
2214
2215 #[test]
2216 fn test_projection_from_vec() -> Result<()> {
2217 let exprs = vec![ProjectionExpr {
2218 expr: Arc::new(Column::new("x", 0)),
2219 alias: "x".to_string(),
2220 }];
2221 let projection: ProjectionExprs = exprs.clone().into();
2222 assert_eq!(projection.as_ref().len(), 1);
2223 Ok(())
2224 }
2225
2226 #[test]
2227 fn test_projection_as_ref() -> Result<()> {
2228 let exprs = vec![
2229 ProjectionExpr {
2230 expr: Arc::new(Column::new("col1", 0)),
2231 alias: "col1".to_string(),
2232 },
2233 ProjectionExpr {
2234 expr: Arc::new(Column::new("col2", 1)),
2235 alias: "col2".to_string(),
2236 },
2237 ];
2238 let projection = ProjectionExprs::new(exprs);
2239 let as_ref: &[ProjectionExpr] = projection.as_ref();
2240 assert_eq!(as_ref.len(), 2);
2241 Ok(())
2242 }
2243
2244 #[test]
2245 fn test_column_indices_multiple_columns() -> Result<()> {
2246 // Test with reversed column order to ensure proper reordering
2247 let projection = ProjectionExprs::new(vec![
2248 ProjectionExpr {
2249 expr: Arc::new(Column::new("c", 5)),
2250 alias: "c".to_string(),
2251 },
2252 ProjectionExpr {
2253 expr: Arc::new(Column::new("b", 2)),
2254 alias: "b".to_string(),
2255 },
2256 ProjectionExpr {
2257 expr: Arc::new(Column::new("a", 0)),
2258 alias: "a".to_string(),
2259 },
2260 ]);
2261 // Should return sorted indices regardless of projection order
2262 assert_eq!(projection.column_indices(), vec![0, 2, 5]);
2263 Ok(())
2264 }
2265
2266 #[test]
2267 fn test_column_indices_duplicates() -> Result<()> {
2268 // Test that duplicate column indices appear only once
2269 let projection = ProjectionExprs::new(vec![
2270 ProjectionExpr {
2271 expr: Arc::new(Column::new("a", 1)),
2272 alias: "a".to_string(),
2273 },
2274 ProjectionExpr {
2275 expr: Arc::new(Column::new("b", 3)),
2276 alias: "b".to_string(),
2277 },
2278 ProjectionExpr {
2279 expr: Arc::new(Column::new("a2", 1)), // duplicate index
2280 alias: "a2".to_string(),
2281 },
2282 ]);
2283 assert_eq!(projection.column_indices(), vec![1, 3]);
2284 Ok(())
2285 }
2286
2287 #[test]
2288 fn test_column_indices_unsorted() -> Result<()> {
2289 // Test that column indices are sorted in the output
2290 let projection = ProjectionExprs::new(vec![
2291 ProjectionExpr {
2292 expr: Arc::new(Column::new("c", 5)),
2293 alias: "c".to_string(),
2294 },
2295 ProjectionExpr {
2296 expr: Arc::new(Column::new("a", 1)),
2297 alias: "a".to_string(),
2298 },
2299 ProjectionExpr {
2300 expr: Arc::new(Column::new("b", 3)),
2301 alias: "b".to_string(),
2302 },
2303 ]);
2304 assert_eq!(projection.column_indices(), vec![1, 3, 5]);
2305 Ok(())
2306 }
2307
2308 #[test]
2309 fn test_column_indices_complex_expr() -> Result<()> {
2310 // Test with complex expressions containing multiple columns
2311 let expr = Arc::new(BinaryExpr::new(
2312 Arc::new(Column::new("a", 1)),
2313 Operator::Plus,
2314 Arc::new(Column::new("b", 4)),
2315 ));
2316 let projection = ProjectionExprs::new(vec![
2317 ProjectionExpr {
2318 expr,
2319 alias: "sum".to_string(),
2320 },
2321 ProjectionExpr {
2322 expr: Arc::new(Column::new("c", 2)),
2323 alias: "c".to_string(),
2324 },
2325 ]);
2326 // Should return [1, 2, 4] - all columns used, sorted and deduplicated
2327 assert_eq!(projection.column_indices(), vec![1, 2, 4]);
2328 Ok(())
2329 }
2330
2331 #[test]
2332 fn test_column_indices_empty() -> Result<()> {
2333 let projection = ProjectionExprs::new(vec![]);
2334 assert_eq!(projection.column_indices(), Vec::<usize>::new());
2335 Ok(())
2336 }
2337
2338 #[test]
2339 fn test_merge_simple_columns() -> Result<()> {
2340 // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
2341 let base_projection = ProjectionExprs::new(vec![
2342 ProjectionExpr {
2343 expr: Arc::new(Column::new("c", 2)),
2344 alias: "x".to_string(),
2345 },
2346 ProjectionExpr {
2347 expr: Arc::new(Column::new("b", 1)),
2348 alias: "y".to_string(),
2349 },
2350 ProjectionExpr {
2351 expr: Arc::new(Column::new("a", 0)),
2352 alias: "z".to_string(),
2353 },
2354 ]);
2355
2356 // Second projection: SELECT y@1 AS col2, x@0 AS col1
2357 let top_projection = ProjectionExprs::new(vec![
2358 ProjectionExpr {
2359 expr: Arc::new(Column::new("y", 1)),
2360 alias: "col2".to_string(),
2361 },
2362 ProjectionExpr {
2363 expr: Arc::new(Column::new("x", 0)),
2364 alias: "col1".to_string(),
2365 },
2366 ]);
2367
2368 // Merge should produce: SELECT b@1 AS col2, c@2 AS col1
2369 let merged = base_projection.try_merge(&top_projection)?;
2370 assert_snapshot!(format!("{merged}"), @"Projection[b@1 AS col2, c@2 AS col1]");
2371
2372 Ok(())
2373 }
2374
2375 #[test]
2376 fn test_merge_with_expressions() -> Result<()> {
2377 // First projection: SELECT c@2 AS x, b@1 AS y, a@0 AS z
2378 let base_projection = ProjectionExprs::new(vec![
2379 ProjectionExpr {
2380 expr: Arc::new(Column::new("c", 2)),
2381 alias: "x".to_string(),
2382 },
2383 ProjectionExpr {
2384 expr: Arc::new(Column::new("b", 1)),
2385 alias: "y".to_string(),
2386 },
2387 ProjectionExpr {
2388 expr: Arc::new(Column::new("a", 0)),
2389 alias: "z".to_string(),
2390 },
2391 ]);
2392
2393 // Second projection: SELECT y@1 + z@2 AS c2, x@0 + 1 AS c1
2394 let top_projection = ProjectionExprs::new(vec![
2395 ProjectionExpr {
2396 expr: Arc::new(BinaryExpr::new(
2397 Arc::new(Column::new("y", 1)),
2398 Operator::Plus,
2399 Arc::new(Column::new("z", 2)),
2400 )),
2401 alias: "c2".to_string(),
2402 },
2403 ProjectionExpr {
2404 expr: Arc::new(BinaryExpr::new(
2405 Arc::new(Column::new("x", 0)),
2406 Operator::Plus,
2407 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
2408 )),
2409 alias: "c1".to_string(),
2410 },
2411 ]);
2412
2413 // Merge should produce: SELECT b@1 + a@0 AS c2, c@2 + 1 AS c1
2414 let merged = base_projection.try_merge(&top_projection)?;
2415 assert_snapshot!(format!("{merged}"), @"Projection[b@1 + a@0 AS c2, c@2 + 1 AS c1]");
2416
2417 Ok(())
2418 }
2419
2420 #[test]
2421 fn try_merge_error() {
2422 // Create a base projection
2423 let base = ProjectionExprs::new(vec![
2424 ProjectionExpr {
2425 expr: Arc::new(Column::new("a", 0)),
2426 alias: "x".to_string(),
2427 },
2428 ProjectionExpr {
2429 expr: Arc::new(Column::new("b", 1)),
2430 alias: "y".to_string(),
2431 },
2432 ]);
2433
2434 // Create a top projection that references a non-existent column index
2435 let top = ProjectionExprs::new(vec![ProjectionExpr {
2436 expr: Arc::new(Column::new("z", 5)), // Invalid index
2437 alias: "result".to_string(),
2438 }]);
2439
2440 // Attempt to merge and expect an error
2441 let err_msg = base.try_merge(&top).unwrap_err().to_string();
2442 assert!(
2443 err_msg.contains("Internal error: Column index 5 out of bounds for projected expressions of length 2"),
2444 "Unexpected error message: {err_msg}",
2445 );
2446 }
2447
2448 #[test]
2449 fn test_merge_empty_projection_with_literal() -> Result<()> {
2450 // This test reproduces the issue from roundtrip_empty_projection test
2451 // Query like: SELECT 1 FROM table
2452 // where the file scan needs no columns (empty projection)
2453 // but we project a literal on top
2454
2455 // Empty base projection (no columns needed from file)
2456 let base_projection = ProjectionExprs::new(vec![]);
2457
2458 // Top projection with a literal expression: SELECT 1
2459 let top_projection = ProjectionExprs::new(vec![ProjectionExpr {
2460 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2461 alias: "Int64(1)".to_string(),
2462 }]);
2463
2464 // This should succeed - literals don't reference columns so they should
2465 // pass through unchanged when merged with an empty projection
2466 let merged = base_projection.try_merge(&top_projection)?;
2467 assert_snapshot!(format!("{merged}"), @"Projection[1 AS Int64(1)]");
2468
2469 Ok(())
2470 }
2471
2472 #[test]
2473 fn test_update_expr_with_literal() -> Result<()> {
2474 // Test that update_expr correctly handles expressions without column references
2475 let literal_expr: Arc<dyn PhysicalExpr> =
2476 Arc::new(Literal::new(ScalarValue::Int64(Some(42))));
2477 let empty_projection: Vec<ProjectionExpr> = vec![];
2478
2479 // Updating a literal with an empty projection should return the literal unchanged
2480 let result = update_expr(&literal_expr, &empty_projection, true)?;
2481 assert!(result.is_some(), "Literal expression should be valid");
2482
2483 let result_expr = result.unwrap();
2484 assert_eq!(
2485 result_expr
2486 .as_any()
2487 .downcast_ref::<Literal>()
2488 .unwrap()
2489 .value(),
2490 &ScalarValue::Int64(Some(42))
2491 );
2492
2493 Ok(())
2494 }
2495
2496 #[test]
2497 fn test_update_expr_with_complex_literal_expr() -> Result<()> {
2498 // Test update_expr with an expression containing both literals and a column
2499 // This tests the case where we have: literal + column
2500 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
2501 Arc::new(Literal::new(ScalarValue::Int64(Some(10)))),
2502 Operator::Plus,
2503 Arc::new(Column::new("x", 0)),
2504 ));
2505
2506 // Base projection that maps column 0 to a different expression
2507 let base_projection = vec![ProjectionExpr {
2508 expr: Arc::new(Column::new("a", 5)),
2509 alias: "x".to_string(),
2510 }];
2511
2512 // The expression should be updated: 10 + x@0 becomes 10 + a@5
2513 let result = update_expr(&expr, &base_projection, true)?;
2514 assert!(result.is_some(), "Expression should be valid");
2515
2516 let result_expr = result.unwrap();
2517 let binary = result_expr
2518 .as_any()
2519 .downcast_ref::<BinaryExpr>()
2520 .expect("Should be a BinaryExpr");
2521
2522 // Left side should still be the literal
2523 assert!(binary.left().as_any().downcast_ref::<Literal>().is_some());
2524
2525 // Right side should be updated to reference column at index 5
2526 let right_col = binary
2527 .right()
2528 .as_any()
2529 .downcast_ref::<Column>()
2530 .expect("Right should be a Column");
2531 assert_eq!(right_col.index(), 5);
2532
2533 Ok(())
2534 }
2535
2536 #[test]
2537 fn test_project_schema_simple_columns() -> Result<()> {
2538 // Input schema: [col0: Int64, col1: Utf8, col2: Float32]
2539 let input_schema = get_schema();
2540
2541 // Projection: SELECT col2 AS c, col0 AS a
2542 let projection = ProjectionExprs::new(vec![
2543 ProjectionExpr {
2544 expr: Arc::new(Column::new("col2", 2)),
2545 alias: "c".to_string(),
2546 },
2547 ProjectionExpr {
2548 expr: Arc::new(Column::new("col0", 0)),
2549 alias: "a".to_string(),
2550 },
2551 ]);
2552
2553 let output_schema = projection.project_schema(&input_schema)?;
2554
2555 // Should have 2 fields
2556 assert_eq!(output_schema.fields().len(), 2);
2557
2558 // First field should be "c" with Float32 type
2559 assert_eq!(output_schema.field(0).name(), "c");
2560 assert_eq!(output_schema.field(0).data_type(), &DataType::Float32);
2561
2562 // Second field should be "a" with Int64 type
2563 assert_eq!(output_schema.field(1).name(), "a");
2564 assert_eq!(output_schema.field(1).data_type(), &DataType::Int64);
2565
2566 Ok(())
2567 }
2568
2569 #[test]
2570 fn test_project_schema_with_expressions() -> Result<()> {
2571 // Input schema: [col0: Int64, col1: Utf8, col2: Float32]
2572 let input_schema = get_schema();
2573
2574 // Projection: SELECT col0 + 1 AS incremented
2575 let projection = ProjectionExprs::new(vec![ProjectionExpr {
2576 expr: Arc::new(BinaryExpr::new(
2577 Arc::new(Column::new("col0", 0)),
2578 Operator::Plus,
2579 Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2580 )),
2581 alias: "incremented".to_string(),
2582 }]);
2583
2584 let output_schema = projection.project_schema(&input_schema)?;
2585
2586 // Should have 1 field
2587 assert_eq!(output_schema.fields().len(), 1);
2588
2589 // Field should be "incremented" with Int64 type
2590 assert_eq!(output_schema.field(0).name(), "incremented");
2591 assert_eq!(output_schema.field(0).data_type(), &DataType::Int64);
2592
2593 Ok(())
2594 }
2595
2596 #[test]
2597 fn test_project_schema_preserves_metadata() -> Result<()> {
2598 // Create schema with metadata
2599 let mut metadata = HashMap::new();
2600 metadata.insert("key".to_string(), "value".to_string());
2601 let field_with_metadata =
2602 Field::new("col0", DataType::Int64, false).with_metadata(metadata.clone());
2603 let input_schema = Schema::new(vec![
2604 field_with_metadata,
2605 Field::new("col1", DataType::Utf8, false),
2606 ]);
2607
2608 // Projection: SELECT col0 AS renamed
2609 let projection = ProjectionExprs::new(vec![ProjectionExpr {
2610 expr: Arc::new(Column::new("col0", 0)),
2611 alias: "renamed".to_string(),
2612 }]);
2613
2614 let output_schema = projection.project_schema(&input_schema)?;
2615
2616 // Should have 1 field
2617 assert_eq!(output_schema.fields().len(), 1);
2618
2619 // Field should be "renamed" with metadata preserved
2620 assert_eq!(output_schema.field(0).name(), "renamed");
2621 assert_eq!(output_schema.field(0).metadata(), &metadata);
2622
2623 Ok(())
2624 }
2625
2626 #[test]
2627 fn test_project_schema_empty() -> Result<()> {
2628 let input_schema = get_schema();
2629 let projection = ProjectionExprs::new(vec![]);
2630
2631 let output_schema = projection.project_schema(&input_schema)?;
2632
2633 assert_eq!(output_schema.fields().len(), 0);
2634
2635 Ok(())
2636 }
2637
2638 #[test]
2639 fn test_project_statistics_columns_only() -> Result<()> {
2640 let input_stats = get_stats();
2641 let input_schema = get_schema();
2642
2643 // Projection: SELECT col1 AS text, col0 AS num
2644 let projection = ProjectionExprs::new(vec![
2645 ProjectionExpr {
2646 expr: Arc::new(Column::new("col1", 1)),
2647 alias: "text".to_string(),
2648 },
2649 ProjectionExpr {
2650 expr: Arc::new(Column::new("col0", 0)),
2651 alias: "num".to_string(),
2652 },
2653 ]);
2654
2655 let output_stats = projection.project_statistics(
2656 input_stats,
2657 &projection.project_schema(&input_schema)?,
2658 )?;
2659
2660 // Row count should be preserved
2661 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2662
2663 // Should have 2 column statistics (reordered from input)
2664 assert_eq!(output_stats.column_statistics.len(), 2);
2665
2666 // First column (col1 from input)
2667 assert_eq!(
2668 output_stats.column_statistics[0].distinct_count,
2669 Precision::Exact(1)
2670 );
2671 assert_eq!(
2672 output_stats.column_statistics[0].max_value,
2673 Precision::Exact(ScalarValue::from("x"))
2674 );
2675
2676 // Second column (col0 from input)
2677 assert_eq!(
2678 output_stats.column_statistics[1].distinct_count,
2679 Precision::Exact(5)
2680 );
2681 assert_eq!(
2682 output_stats.column_statistics[1].max_value,
2683 Precision::Exact(ScalarValue::Int64(Some(21)))
2684 );
2685
2686 Ok(())
2687 }
2688
2689 #[test]
2690 fn test_project_statistics_with_expressions() -> Result<()> {
2691 let input_stats = get_stats();
2692 let input_schema = get_schema();
2693
2694 // Projection with expression: SELECT col0 + 1 AS incremented, col1 AS text
2695 let projection = ProjectionExprs::new(vec![
2696 ProjectionExpr {
2697 expr: Arc::new(BinaryExpr::new(
2698 Arc::new(Column::new("col0", 0)),
2699 Operator::Plus,
2700 Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2701 )),
2702 alias: "incremented".to_string(),
2703 },
2704 ProjectionExpr {
2705 expr: Arc::new(Column::new("col1", 1)),
2706 alias: "text".to_string(),
2707 },
2708 ]);
2709
2710 let output_stats = projection.project_statistics(
2711 input_stats,
2712 &projection.project_schema(&input_schema)?,
2713 )?;
2714
2715 // Row count should be preserved
2716 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2717
2718 // Should have 2 column statistics
2719 assert_eq!(output_stats.column_statistics.len(), 2);
2720
2721 // First column (expression) should have unknown statistics
2722 assert_eq!(
2723 output_stats.column_statistics[0].distinct_count,
2724 Precision::Absent
2725 );
2726 assert_eq!(
2727 output_stats.column_statistics[0].max_value,
2728 Precision::Absent
2729 );
2730
2731 // Second column (col1) should preserve statistics
2732 assert_eq!(
2733 output_stats.column_statistics[1].distinct_count,
2734 Precision::Exact(1)
2735 );
2736
2737 Ok(())
2738 }
2739
2740 #[test]
2741 fn test_project_statistics_primitive_width_only() -> Result<()> {
2742 let input_stats = get_stats();
2743 let input_schema = get_schema();
2744
2745 // Projection with only primitive width columns: SELECT col2 AS f, col0 AS i
2746 let projection = ProjectionExprs::new(vec![
2747 ProjectionExpr {
2748 expr: Arc::new(Column::new("col2", 2)),
2749 alias: "f".to_string(),
2750 },
2751 ProjectionExpr {
2752 expr: Arc::new(Column::new("col0", 0)),
2753 alias: "i".to_string(),
2754 },
2755 ]);
2756
2757 let output_stats = projection.project_statistics(
2758 input_stats,
2759 &projection.project_schema(&input_schema)?,
2760 )?;
2761
2762 // Row count should be preserved
2763 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2764
2765 // Total byte size should be recalculated for primitive types
2766 // Float32 (4 bytes) + Int64 (8 bytes) = 12 bytes per row, 5 rows = 60 bytes
2767 assert_eq!(output_stats.total_byte_size, Precision::Exact(60));
2768
2769 // Should have 2 column statistics
2770 assert_eq!(output_stats.column_statistics.len(), 2);
2771
2772 Ok(())
2773 }
2774
2775 #[test]
2776 fn test_project_statistics_empty() -> Result<()> {
2777 let input_stats = get_stats();
2778 let input_schema = get_schema();
2779
2780 let projection = ProjectionExprs::new(vec![]);
2781
2782 let output_stats = projection.project_statistics(
2783 input_stats,
2784 &projection.project_schema(&input_schema)?,
2785 )?;
2786
2787 // Row count should be preserved
2788 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2789
2790 // Should have no column statistics
2791 assert_eq!(output_stats.column_statistics.len(), 0);
2792
2793 // Total byte size should be 0 for empty projection
2794 assert_eq!(output_stats.total_byte_size, Precision::Exact(0));
2795
2796 Ok(())
2797 }
2798
2799 // Test statistics calculation for non-null literal (numeric constant)
2800 #[test]
2801 fn test_project_statistics_with_literal() -> Result<()> {
2802 let input_stats = get_stats();
2803 let input_schema = get_schema();
2804
2805 // Projection with literal: SELECT 42 AS constant, col0 AS num
2806 let projection = ProjectionExprs::new(vec![
2807 ProjectionExpr {
2808 expr: Arc::new(Literal::new(ScalarValue::Int64(Some(42)))),
2809 alias: "constant".to_string(),
2810 },
2811 ProjectionExpr {
2812 expr: Arc::new(Column::new("col0", 0)),
2813 alias: "num".to_string(),
2814 },
2815 ]);
2816
2817 let output_stats = projection.project_statistics(
2818 input_stats,
2819 &projection.project_schema(&input_schema)?,
2820 )?;
2821
2822 // Row count should be preserved
2823 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2824
2825 // Should have 2 column statistics
2826 assert_eq!(output_stats.column_statistics.len(), 2);
2827
2828 // First column (literal 42) should have proper constant statistics
2829 assert_eq!(
2830 output_stats.column_statistics[0].min_value,
2831 Precision::Exact(ScalarValue::Int64(Some(42)))
2832 );
2833 assert_eq!(
2834 output_stats.column_statistics[0].max_value,
2835 Precision::Exact(ScalarValue::Int64(Some(42)))
2836 );
2837 assert_eq!(
2838 output_stats.column_statistics[0].distinct_count,
2839 Precision::Exact(1)
2840 );
2841 assert_eq!(
2842 output_stats.column_statistics[0].null_count,
2843 Precision::Exact(0)
2844 );
2845 // Int64 is 8 bytes, 5 rows = 40 bytes
2846 assert_eq!(
2847 output_stats.column_statistics[0].byte_size,
2848 Precision::Exact(40)
2849 );
2850 // For a constant column, sum_value = value * num_rows = 42 * 5 = 210
2851 assert_eq!(
2852 output_stats.column_statistics[0].sum_value,
2853 Precision::Exact(ScalarValue::Int64(Some(210)))
2854 );
2855
2856 // Second column (col0) should preserve statistics
2857 assert_eq!(
2858 output_stats.column_statistics[1].distinct_count,
2859 Precision::Exact(5)
2860 );
2861 assert_eq!(
2862 output_stats.column_statistics[1].max_value,
2863 Precision::Exact(ScalarValue::Int64(Some(21)))
2864 );
2865
2866 Ok(())
2867 }
2868
2869 // Test statistics calculation for NULL literal (constant NULL column)
2870 #[test]
2871 fn test_project_statistics_with_null_literal() -> Result<()> {
2872 let input_stats = get_stats();
2873 let input_schema = get_schema();
2874
2875 // Projection with NULL literal: SELECT NULL AS null_col, col0 AS num
2876 let projection = ProjectionExprs::new(vec![
2877 ProjectionExpr {
2878 expr: Arc::new(Literal::new(ScalarValue::Int64(None))),
2879 alias: "null_col".to_string(),
2880 },
2881 ProjectionExpr {
2882 expr: Arc::new(Column::new("col0", 0)),
2883 alias: "num".to_string(),
2884 },
2885 ]);
2886
2887 let output_stats = projection.project_statistics(
2888 input_stats,
2889 &projection.project_schema(&input_schema)?,
2890 )?;
2891
2892 // Row count should be preserved
2893 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2894
2895 // Should have 2 column statistics
2896 assert_eq!(output_stats.column_statistics.len(), 2);
2897
2898 // First column (NULL literal) should have proper constant NULL statistics
2899 assert_eq!(
2900 output_stats.column_statistics[0].min_value,
2901 Precision::Exact(ScalarValue::Int64(None))
2902 );
2903 assert_eq!(
2904 output_stats.column_statistics[0].max_value,
2905 Precision::Exact(ScalarValue::Int64(None))
2906 );
2907 assert_eq!(
2908 output_stats.column_statistics[0].distinct_count,
2909 Precision::Exact(1) // All NULLs are considered the same
2910 );
2911 assert_eq!(
2912 output_stats.column_statistics[0].null_count,
2913 Precision::Exact(5) // All rows are NULL
2914 );
2915 assert_eq!(
2916 output_stats.column_statistics[0].byte_size,
2917 Precision::Exact(0)
2918 );
2919 assert_eq!(
2920 output_stats.column_statistics[0].sum_value,
2921 Precision::Exact(ScalarValue::Int64(None))
2922 );
2923
2924 // Second column (col0) should preserve statistics
2925 assert_eq!(
2926 output_stats.column_statistics[1].distinct_count,
2927 Precision::Exact(5)
2928 );
2929 assert_eq!(
2930 output_stats.column_statistics[1].max_value,
2931 Precision::Exact(ScalarValue::Int64(Some(21)))
2932 );
2933
2934 Ok(())
2935 }
2936
2937 // Test statistics calculation for complex type literal (e.g., Utf8 string)
2938 #[test]
2939 fn test_project_statistics_with_complex_type_literal() -> Result<()> {
2940 let input_stats = get_stats();
2941 let input_schema = get_schema();
2942
2943 // Projection with Utf8 literal (complex type): SELECT 'hello' AS text, col0 AS num
2944 let projection = ProjectionExprs::new(vec![
2945 ProjectionExpr {
2946 expr: Arc::new(Literal::new(ScalarValue::Utf8(Some(
2947 "hello".to_string(),
2948 )))),
2949 alias: "text".to_string(),
2950 },
2951 ProjectionExpr {
2952 expr: Arc::new(Column::new("col0", 0)),
2953 alias: "num".to_string(),
2954 },
2955 ]);
2956
2957 let output_stats = projection.project_statistics(
2958 input_stats,
2959 &projection.project_schema(&input_schema)?,
2960 )?;
2961
2962 // Row count should be preserved
2963 assert_eq!(output_stats.num_rows, Precision::Exact(5));
2964
2965 // Should have 2 column statistics
2966 assert_eq!(output_stats.column_statistics.len(), 2);
2967
2968 // First column (Utf8 literal 'hello') should have proper constant statistics
2969 // but byte_size should be Absent for complex types
2970 assert_eq!(
2971 output_stats.column_statistics[0].min_value,
2972 Precision::Exact(ScalarValue::Utf8(Some("hello".to_string())))
2973 );
2974 assert_eq!(
2975 output_stats.column_statistics[0].max_value,
2976 Precision::Exact(ScalarValue::Utf8(Some("hello".to_string())))
2977 );
2978 assert_eq!(
2979 output_stats.column_statistics[0].distinct_count,
2980 Precision::Exact(1)
2981 );
2982 assert_eq!(
2983 output_stats.column_statistics[0].null_count,
2984 Precision::Exact(0)
2985 );
2986 // Complex types (Utf8, List, etc.) should have byte_size = Absent
2987 // because we can't calculate exact size without knowing the actual data
2988 assert_eq!(
2989 output_stats.column_statistics[0].byte_size,
2990 Precision::Absent
2991 );
2992 // Non-numeric types (Utf8) should have sum_value = Absent
2993 // because sum is only meaningful for numeric types
2994 assert_eq!(
2995 output_stats.column_statistics[0].sum_value,
2996 Precision::Absent
2997 );
2998
2999 // Second column (col0) should preserve statistics
3000 assert_eq!(
3001 output_stats.column_statistics[1].distinct_count,
3002 Precision::Exact(5)
3003 );
3004 assert_eq!(
3005 output_stats.column_statistics[1].max_value,
3006 Precision::Exact(ScalarValue::Int64(Some(21)))
3007 );
3008
3009 Ok(())
3010 }
3011}