datafusion_physical_expr/
physical_expr.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::expressions::{self, Column};
21use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr};
22
23use arrow::compute::SortOptions;
24use arrow::datatypes::{Schema, SchemaRef};
25use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
26use datafusion_common::{plan_err, Result};
27use datafusion_common::{DFSchema, HashMap};
28use datafusion_expr::execution_props::ExecutionProps;
29use datafusion_expr::{Expr, SortExpr};
30
31use itertools::izip;
32// Exports:
33pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
34
35/// Adds the `offset` value to `Column` indices inside `expr`. This function is
36/// generally used during the update of the right table schema in join operations.
37pub fn add_offset_to_expr(
38    expr: Arc<dyn PhysicalExpr>,
39    offset: isize,
40) -> Result<Arc<dyn PhysicalExpr>> {
41    expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
42        Some(col) => {
43            let Some(idx) = col.index().checked_add_signed(offset) else {
44                return plan_err!("Column index overflow");
45            };
46            Ok(Transformed::yes(Arc::new(Column::new(col.name(), idx))))
47        }
48        None => Ok(Transformed::no(e)),
49    })
50    .data()
51}
52
53/// This function is similar to the `contains` method of `Vec`. It finds
54/// whether `expr` is among `physical_exprs`.
55pub fn physical_exprs_contains(
56    physical_exprs: &[Arc<dyn PhysicalExpr>],
57    expr: &Arc<dyn PhysicalExpr>,
58) -> bool {
59    physical_exprs
60        .iter()
61        .any(|physical_expr| physical_expr.eq(expr))
62}
63
64/// Checks whether the given physical expression slices are equal.
65pub fn physical_exprs_equal(
66    lhs: &[Arc<dyn PhysicalExpr>],
67    rhs: &[Arc<dyn PhysicalExpr>],
68) -> bool {
69    lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
70}
71
72/// Checks whether the given physical expression slices are equal in the sense
73/// of bags (multi-sets), disregarding their orderings.
74pub fn physical_exprs_bag_equal(
75    lhs: &[Arc<dyn PhysicalExpr>],
76    rhs: &[Arc<dyn PhysicalExpr>],
77) -> bool {
78    let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
79    let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
80    for expr in lhs {
81        *multi_set_lhs.entry(expr).or_insert(0) += 1;
82    }
83    for expr in rhs {
84        *multi_set_rhs.entry(expr).or_insert(0) += 1;
85    }
86    multi_set_lhs == multi_set_rhs
87}
88
89/// Converts logical sort expressions to physical sort expressions.
90///
91/// This function transforms a collection of logical sort expressions into their
92/// physical representation that can be used during query execution.
93///
94/// # Arguments
95///
96/// * `schema` - The schema containing column definitions.
97/// * `sort_order` - A collection of logical sort expressions grouped into
98///   lexicographic orderings.
99///
100/// # Returns
101///
102/// A vector of lexicographic orderings for physical execution, or an error if
103/// the transformation fails.
104///
105/// # Examples
106///
107/// ```
108/// // Create orderings from columns "id" and "name"
109/// # use arrow::datatypes::{Schema, Field, DataType};
110/// # use datafusion_physical_expr::create_ordering;
111/// # use datafusion_common::Column;
112/// # use datafusion_expr::{Expr, SortExpr};
113/// #
114/// // Create a schema with two fields
115/// let schema = Schema::new(vec![
116///     Field::new("id", DataType::Int32, false),
117///     Field::new("name", DataType::Utf8, false),
118/// ]);
119///
120/// let sort_exprs = vec![
121///     vec![SortExpr {
122///         expr: Expr::Column(Column::new(Some("t"), "id")),
123///         asc: true,
124///         nulls_first: false,
125///     }],
126///     vec![SortExpr {
127///         expr: Expr::Column(Column::new(Some("t"), "name")),
128///         asc: false,
129///         nulls_first: true,
130///     }],
131/// ];
132/// let result = create_ordering(&schema, &sort_exprs).unwrap();
133/// ```
134pub fn create_ordering(
135    schema: &Schema,
136    sort_order: &[Vec<SortExpr>],
137) -> Result<Vec<LexOrdering>> {
138    let mut all_sort_orders = vec![];
139
140    for (group_idx, exprs) in sort_order.iter().enumerate() {
141        // Construct PhysicalSortExpr objects from Expr objects:
142        let mut sort_exprs = vec![];
143        for (expr_idx, sort) in exprs.iter().enumerate() {
144            match &sort.expr {
145                Expr::Column(col) => match expressions::col(&col.name, schema) {
146                    Ok(expr) => {
147                        let opts = SortOptions::new(!sort.asc, sort.nulls_first);
148                        sort_exprs.push(PhysicalSortExpr::new(expr, opts));
149                    }
150                    // Cannot find expression in the projected_schema, stop iterating
151                    // since rest of the orderings are violated
152                    Err(_) => break,
153                },
154                expr => {
155                    return plan_err!(
156                        "Expected single column reference in sort_order[{}][{}], got {}",
157                        group_idx,
158                        expr_idx,
159                        expr
160                    );
161                }
162            }
163        }
164        all_sort_orders.extend(LexOrdering::new(sort_exprs));
165    }
166    Ok(all_sort_orders)
167}
168
169/// Creates a vector of [LexOrdering] from a vector of logical expression
170pub fn create_lex_ordering(
171    schema: &SchemaRef,
172    sort_order: &[Vec<SortExpr>],
173    execution_props: &ExecutionProps,
174) -> Result<Vec<LexOrdering>> {
175    // Try the fast path that only supports column references first
176    // This avoids creating a DFSchema
177    if let Ok(ordering) = create_ordering(schema, sort_order) {
178        return Ok(ordering);
179    }
180
181    let df_schema = DFSchema::try_from(Arc::clone(schema))?;
182
183    let mut all_sort_orders = vec![];
184
185    for exprs in sort_order.iter() {
186        all_sort_orders.extend(LexOrdering::new(create_physical_sort_exprs(
187            exprs,
188            &df_schema,
189            execution_props,
190        )?));
191    }
192    Ok(all_sort_orders)
193}
194
195/// Create a physical sort expression from a logical expression
196pub fn create_physical_sort_expr(
197    e: &SortExpr,
198    input_dfschema: &DFSchema,
199    execution_props: &ExecutionProps,
200) -> Result<PhysicalSortExpr> {
201    create_physical_expr(&e.expr, input_dfschema, execution_props).map(|expr| {
202        let options = SortOptions::new(!e.asc, e.nulls_first);
203        PhysicalSortExpr::new(expr, options)
204    })
205}
206
207/// Create vector of physical sort expression from a vector of logical expression
208pub fn create_physical_sort_exprs(
209    exprs: &[SortExpr],
210    input_dfschema: &DFSchema,
211    execution_props: &ExecutionProps,
212) -> Result<Vec<PhysicalSortExpr>> {
213    exprs
214        .iter()
215        .map(|e| create_physical_sort_expr(e, input_dfschema, execution_props))
216        .collect()
217}
218
219pub fn add_offset_to_physical_sort_exprs(
220    sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
221    offset: isize,
222) -> Result<Vec<PhysicalSortExpr>> {
223    sort_exprs
224        .into_iter()
225        .map(|mut sort_expr| {
226            sort_expr.expr = add_offset_to_expr(sort_expr.expr, offset)?;
227            Ok(sort_expr)
228        })
229        .collect()
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    use crate::expressions::{BinaryExpr, Column, Literal};
237    use crate::physical_expr::{
238        physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
239    };
240    use datafusion_physical_expr_common::physical_expr::is_volatile;
241
242    use arrow::datatypes::{DataType, Schema};
243    use arrow::record_batch::RecordBatch;
244    use datafusion_common::{Result, ScalarValue};
245    use datafusion_expr::ColumnarValue;
246    use datafusion_expr::Operator;
247    use std::any::Any;
248    use std::fmt;
249
250    #[test]
251    fn test_physical_exprs_contains() {
252        let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
253            as Arc<dyn PhysicalExpr>;
254        let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
255            as Arc<dyn PhysicalExpr>;
256        let lit4 =
257            Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
258        let lit2 =
259            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
260        let lit1 =
261            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
262        let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
263        let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
264        let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
265
266        // lit(true), lit(false), lit(4), lit(2), Col(a), Col(b)
267        let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
268            Arc::clone(&lit_true),
269            Arc::clone(&lit_false),
270            Arc::clone(&lit4),
271            Arc::clone(&lit2),
272            Arc::clone(&col_a_expr),
273            Arc::clone(&col_b_expr),
274        ];
275        // below expressions are inside physical_exprs
276        assert!(physical_exprs_contains(&physical_exprs, &lit_true));
277        assert!(physical_exprs_contains(&physical_exprs, &lit2));
278        assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
279
280        // below expressions are not inside physical_exprs
281        assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
282        assert!(!physical_exprs_contains(&physical_exprs, &lit1));
283    }
284
285    #[test]
286    fn test_physical_exprs_equal() {
287        let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
288            as Arc<dyn PhysicalExpr>;
289        let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
290            as Arc<dyn PhysicalExpr>;
291        let lit1 =
292            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
293        let lit2 =
294            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
295        let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
296
297        let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
298        let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
299        let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
300        let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
301
302        // these vectors are same
303        assert!(physical_exprs_equal(&vec1, &vec1));
304        assert!(physical_exprs_equal(&vec1, &vec4));
305        assert!(physical_exprs_bag_equal(&vec1, &vec1));
306        assert!(physical_exprs_bag_equal(&vec1, &vec4));
307
308        // these vectors are different
309        assert!(!physical_exprs_equal(&vec1, &vec2));
310        assert!(!physical_exprs_equal(&vec1, &vec3));
311        assert!(!physical_exprs_bag_equal(&vec1, &vec2));
312        assert!(!physical_exprs_bag_equal(&vec1, &vec3));
313    }
314
315    #[test]
316    fn test_physical_exprs_set_equal() {
317        let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
318            Arc::new(Column::new("a", 0)),
319            Arc::new(Column::new("a", 0)),
320            Arc::new(Column::new("b", 1)),
321        ];
322        let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
323            Arc::new(Column::new("b", 1)),
324            Arc::new(Column::new("b", 1)),
325            Arc::new(Column::new("a", 0)),
326        ];
327        assert!(!physical_exprs_bag_equal(
328            list1.as_slice(),
329            list2.as_slice()
330        ));
331        assert!(!physical_exprs_bag_equal(
332            list2.as_slice(),
333            list1.as_slice()
334        ));
335        assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
336        assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
337
338        let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
339            Arc::new(Column::new("a", 0)),
340            Arc::new(Column::new("b", 1)),
341            Arc::new(Column::new("c", 2)),
342            Arc::new(Column::new("a", 0)),
343            Arc::new(Column::new("b", 1)),
344        ];
345        let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
346            Arc::new(Column::new("b", 1)),
347            Arc::new(Column::new("b", 1)),
348            Arc::new(Column::new("a", 0)),
349            Arc::new(Column::new("c", 2)),
350            Arc::new(Column::new("a", 0)),
351        ];
352        assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
353        assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
354        assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
355        assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
356        assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
357        assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
358        assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
359        assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
360    }
361
362    #[test]
363    fn test_is_volatile_default_behavior() {
364        // Test that default PhysicalExpr implementations are not volatile
365        let literal =
366            Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
367        let column = Arc::new(Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
368
369        // Test is_volatile_node() - should return false by default
370        assert!(!literal.is_volatile_node());
371        assert!(!column.is_volatile_node());
372
373        // Test is_volatile() - should return false for non-volatile expressions
374        assert!(!is_volatile(&literal));
375        assert!(!is_volatile(&column));
376    }
377
378    /// Mock volatile PhysicalExpr for testing purposes
379    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
380    struct MockVolatileExpr {
381        volatile: bool,
382    }
383
384    impl MockVolatileExpr {
385        fn new(volatile: bool) -> Self {
386            Self { volatile }
387        }
388    }
389
390    impl fmt::Display for MockVolatileExpr {
391        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
392            write!(f, "MockVolatile({})", self.volatile)
393        }
394    }
395
396    impl PhysicalExpr for MockVolatileExpr {
397        fn as_any(&self) -> &dyn Any {
398            self
399        }
400
401        fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
402            Ok(DataType::Boolean)
403        }
404
405        fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
406            Ok(false)
407        }
408
409        fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
410            Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(
411                self.volatile,
412            ))))
413        }
414
415        fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
416            vec![]
417        }
418
419        fn with_new_children(
420            self: Arc<Self>,
421            _children: Vec<Arc<dyn PhysicalExpr>>,
422        ) -> Result<Arc<dyn PhysicalExpr>> {
423            Ok(self)
424        }
425
426        fn is_volatile_node(&self) -> bool {
427            self.volatile
428        }
429
430        fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431            write!(f, "mock_volatile({})", self.volatile)
432        }
433    }
434
435    #[test]
436    fn test_nested_expression_volatility() {
437        // Test that is_volatile() recursively detects volatility in expression trees
438
439        // Create a volatile mock expression
440        let volatile_expr =
441            Arc::new(MockVolatileExpr::new(true)) as Arc<dyn PhysicalExpr>;
442        assert!(volatile_expr.is_volatile_node());
443        assert!(is_volatile(&volatile_expr));
444
445        // Create a non-volatile mock expression
446        let stable_expr = Arc::new(MockVolatileExpr::new(false)) as Arc<dyn PhysicalExpr>;
447        assert!(!stable_expr.is_volatile_node());
448        assert!(!is_volatile(&stable_expr));
449
450        // Create a literal (non-volatile)
451        let literal =
452            Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
453        assert!(!literal.is_volatile_node());
454        assert!(!is_volatile(&literal));
455
456        // Test composite expression: volatile_expr AND literal
457        // The BinaryExpr itself is not volatile, but contains a volatile child
458        let composite_expr = Arc::new(BinaryExpr::new(
459            Arc::clone(&volatile_expr),
460            Operator::And,
461            Arc::clone(&literal),
462        )) as Arc<dyn PhysicalExpr>;
463
464        assert!(!composite_expr.is_volatile_node()); // BinaryExpr itself is not volatile
465        assert!(is_volatile(&composite_expr)); // But it contains a volatile child
466
467        // Test composite expression with all non-volatile children
468        let stable_composite = Arc::new(BinaryExpr::new(
469            Arc::clone(&stable_expr),
470            Operator::And,
471            Arc::clone(&literal),
472        )) as Arc<dyn PhysicalExpr>;
473
474        assert!(!stable_composite.is_volatile_node());
475        assert!(!is_volatile(&stable_composite)); // No volatile children
476    }
477}