datafusion_physical_expr/
physical_expr.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::expressions::{self, Column};
21use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr};
22
23use arrow::compute::SortOptions;
24use arrow::datatypes::Schema;
25use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
26use datafusion_common::{plan_err, Result};
27use datafusion_common::{DFSchema, HashMap};
28use datafusion_expr::execution_props::ExecutionProps;
29use datafusion_expr::{Expr, SortExpr};
30
31use itertools::izip;
32
33// Exports:
34pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
35
36/// Adds the `offset` value to `Column` indices inside `expr`. This function is
37/// generally used during the update of the right table schema in join operations.
38pub fn add_offset_to_expr(
39    expr: Arc<dyn PhysicalExpr>,
40    offset: isize,
41) -> Result<Arc<dyn PhysicalExpr>> {
42    expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
43        Some(col) => {
44            let Some(idx) = col.index().checked_add_signed(offset) else {
45                return plan_err!("Column index overflow");
46            };
47            Ok(Transformed::yes(Arc::new(Column::new(col.name(), idx))))
48        }
49        None => Ok(Transformed::no(e)),
50    })
51    .data()
52}
53
54/// This function is similar to the `contains` method of `Vec`. It finds
55/// whether `expr` is among `physical_exprs`.
56pub fn physical_exprs_contains(
57    physical_exprs: &[Arc<dyn PhysicalExpr>],
58    expr: &Arc<dyn PhysicalExpr>,
59) -> bool {
60    physical_exprs
61        .iter()
62        .any(|physical_expr| physical_expr.eq(expr))
63}
64
65/// Checks whether the given physical expression slices are equal.
66pub fn physical_exprs_equal(
67    lhs: &[Arc<dyn PhysicalExpr>],
68    rhs: &[Arc<dyn PhysicalExpr>],
69) -> bool {
70    lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
71}
72
73/// Checks whether the given physical expression slices are equal in the sense
74/// of bags (multi-sets), disregarding their orderings.
75pub fn physical_exprs_bag_equal(
76    lhs: &[Arc<dyn PhysicalExpr>],
77    rhs: &[Arc<dyn PhysicalExpr>],
78) -> bool {
79    let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
80    let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
81    for expr in lhs {
82        *multi_set_lhs.entry(expr).or_insert(0) += 1;
83    }
84    for expr in rhs {
85        *multi_set_rhs.entry(expr).or_insert(0) += 1;
86    }
87    multi_set_lhs == multi_set_rhs
88}
89
90/// Converts logical sort expressions to physical sort expressions.
91///
92/// This function transforms a collection of logical sort expressions into their
93/// physical representation that can be used during query execution.
94///
95/// # Arguments
96///
97/// * `schema` - The schema containing column definitions.
98/// * `sort_order` - A collection of logical sort expressions grouped into
99///   lexicographic orderings.
100///
101/// # Returns
102///
103/// A vector of lexicographic orderings for physical execution, or an error if
104/// the transformation fails.
105///
106/// # Examples
107///
108/// ```
109/// // Create orderings from columns "id" and "name"
110/// # use arrow::datatypes::{Schema, Field, DataType};
111/// # use datafusion_physical_expr::create_ordering;
112/// # use datafusion_common::Column;
113/// # use datafusion_expr::{Expr, SortExpr};
114/// #
115/// // Create a schema with two fields
116/// let schema = Schema::new(vec![
117///     Field::new("id", DataType::Int32, false),
118///     Field::new("name", DataType::Utf8, false),
119/// ]);
120///
121/// let sort_exprs = vec![
122///     vec![
123///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc: true, nulls_first: false }
124///     ],
125///     vec![
126///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")), asc: false, nulls_first: true }
127///     ]
128/// ];
129/// let result = create_ordering(&schema, &sort_exprs).unwrap();
130/// ```
131pub fn create_ordering(
132    schema: &Schema,
133    sort_order: &[Vec<SortExpr>],
134) -> Result<Vec<LexOrdering>> {
135    let mut all_sort_orders = vec![];
136
137    for (group_idx, exprs) in sort_order.iter().enumerate() {
138        // Construct PhysicalSortExpr objects from Expr objects:
139        let mut sort_exprs = vec![];
140        for (expr_idx, sort) in exprs.iter().enumerate() {
141            match &sort.expr {
142                Expr::Column(col) => match expressions::col(&col.name, schema) {
143                    Ok(expr) => {
144                        let opts = SortOptions::new(!sort.asc, sort.nulls_first);
145                        sort_exprs.push(PhysicalSortExpr::new(expr, opts));
146                    }
147                    // Cannot find expression in the projected_schema, stop iterating
148                    // since rest of the orderings are violated
149                    Err(_) => break,
150                },
151                expr => {
152                    return plan_err!(
153                        "Expected single column reference in sort_order[{}][{}], got {}",
154                        group_idx,
155                        expr_idx,
156                        expr
157                    );
158                }
159            }
160        }
161        all_sort_orders.extend(LexOrdering::new(sort_exprs));
162    }
163    Ok(all_sort_orders)
164}
165
166/// Create a physical sort expression from a logical expression
167pub fn create_physical_sort_expr(
168    e: &SortExpr,
169    input_dfschema: &DFSchema,
170    execution_props: &ExecutionProps,
171) -> Result<PhysicalSortExpr> {
172    create_physical_expr(&e.expr, input_dfschema, execution_props).map(|expr| {
173        let options = SortOptions::new(!e.asc, e.nulls_first);
174        PhysicalSortExpr::new(expr, options)
175    })
176}
177
178/// Create vector of physical sort expression from a vector of logical expression
179pub fn create_physical_sort_exprs(
180    exprs: &[SortExpr],
181    input_dfschema: &DFSchema,
182    execution_props: &ExecutionProps,
183) -> Result<Vec<PhysicalSortExpr>> {
184    exprs
185        .iter()
186        .map(|e| create_physical_sort_expr(e, input_dfschema, execution_props))
187        .collect()
188}
189
190pub fn add_offset_to_physical_sort_exprs(
191    sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
192    offset: isize,
193) -> Result<Vec<PhysicalSortExpr>> {
194    sort_exprs
195        .into_iter()
196        .map(|mut sort_expr| {
197            sort_expr.expr = add_offset_to_expr(sort_expr.expr, offset)?;
198            Ok(sort_expr)
199        })
200        .collect()
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    use crate::expressions::{BinaryExpr, Column, Literal};
208    use crate::physical_expr::{
209        physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
210    };
211    use datafusion_physical_expr_common::physical_expr::is_volatile;
212
213    use arrow::datatypes::{DataType, Schema};
214    use arrow::record_batch::RecordBatch;
215    use datafusion_common::{Result, ScalarValue};
216    use datafusion_expr::ColumnarValue;
217    use datafusion_expr::Operator;
218    use std::any::Any;
219    use std::fmt;
220
221    #[test]
222    fn test_physical_exprs_contains() {
223        let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
224            as Arc<dyn PhysicalExpr>;
225        let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
226            as Arc<dyn PhysicalExpr>;
227        let lit4 =
228            Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
229        let lit2 =
230            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
231        let lit1 =
232            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
233        let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
234        let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
235        let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
236
237        // lit(true), lit(false), lit(4), lit(2), Col(a), Col(b)
238        let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
239            Arc::clone(&lit_true),
240            Arc::clone(&lit_false),
241            Arc::clone(&lit4),
242            Arc::clone(&lit2),
243            Arc::clone(&col_a_expr),
244            Arc::clone(&col_b_expr),
245        ];
246        // below expressions are inside physical_exprs
247        assert!(physical_exprs_contains(&physical_exprs, &lit_true));
248        assert!(physical_exprs_contains(&physical_exprs, &lit2));
249        assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
250
251        // below expressions are not inside physical_exprs
252        assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
253        assert!(!physical_exprs_contains(&physical_exprs, &lit1));
254    }
255
256    #[test]
257    fn test_physical_exprs_equal() {
258        let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
259            as Arc<dyn PhysicalExpr>;
260        let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
261            as Arc<dyn PhysicalExpr>;
262        let lit1 =
263            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
264        let lit2 =
265            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
266        let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
267
268        let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
269        let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
270        let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
271        let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
272
273        // these vectors are same
274        assert!(physical_exprs_equal(&vec1, &vec1));
275        assert!(physical_exprs_equal(&vec1, &vec4));
276        assert!(physical_exprs_bag_equal(&vec1, &vec1));
277        assert!(physical_exprs_bag_equal(&vec1, &vec4));
278
279        // these vectors are different
280        assert!(!physical_exprs_equal(&vec1, &vec2));
281        assert!(!physical_exprs_equal(&vec1, &vec3));
282        assert!(!physical_exprs_bag_equal(&vec1, &vec2));
283        assert!(!physical_exprs_bag_equal(&vec1, &vec3));
284    }
285
286    #[test]
287    fn test_physical_exprs_set_equal() {
288        let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
289            Arc::new(Column::new("a", 0)),
290            Arc::new(Column::new("a", 0)),
291            Arc::new(Column::new("b", 1)),
292        ];
293        let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
294            Arc::new(Column::new("b", 1)),
295            Arc::new(Column::new("b", 1)),
296            Arc::new(Column::new("a", 0)),
297        ];
298        assert!(!physical_exprs_bag_equal(
299            list1.as_slice(),
300            list2.as_slice()
301        ));
302        assert!(!physical_exprs_bag_equal(
303            list2.as_slice(),
304            list1.as_slice()
305        ));
306        assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
307        assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
308
309        let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
310            Arc::new(Column::new("a", 0)),
311            Arc::new(Column::new("b", 1)),
312            Arc::new(Column::new("c", 2)),
313            Arc::new(Column::new("a", 0)),
314            Arc::new(Column::new("b", 1)),
315        ];
316        let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
317            Arc::new(Column::new("b", 1)),
318            Arc::new(Column::new("b", 1)),
319            Arc::new(Column::new("a", 0)),
320            Arc::new(Column::new("c", 2)),
321            Arc::new(Column::new("a", 0)),
322        ];
323        assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
324        assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
325        assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
326        assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
327        assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
328        assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
329        assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
330        assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
331    }
332
333    #[test]
334    fn test_is_volatile_default_behavior() {
335        // Test that default PhysicalExpr implementations are not volatile
336        let literal =
337            Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
338        let column = Arc::new(Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
339
340        // Test is_volatile_node() - should return false by default
341        assert!(!literal.is_volatile_node());
342        assert!(!column.is_volatile_node());
343
344        // Test is_volatile() - should return false for non-volatile expressions
345        assert!(!is_volatile(&literal));
346        assert!(!is_volatile(&column));
347    }
348
349    /// Mock volatile PhysicalExpr for testing purposes
350    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
351    struct MockVolatileExpr {
352        volatile: bool,
353    }
354
355    impl MockVolatileExpr {
356        fn new(volatile: bool) -> Self {
357            Self { volatile }
358        }
359    }
360
361    impl fmt::Display for MockVolatileExpr {
362        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
363            write!(f, "MockVolatile({})", self.volatile)
364        }
365    }
366
367    impl PhysicalExpr for MockVolatileExpr {
368        fn as_any(&self) -> &dyn Any {
369            self
370        }
371
372        fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
373            Ok(DataType::Boolean)
374        }
375
376        fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
377            Ok(false)
378        }
379
380        fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
381            Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(
382                self.volatile,
383            ))))
384        }
385
386        fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
387            vec![]
388        }
389
390        fn with_new_children(
391            self: Arc<Self>,
392            _children: Vec<Arc<dyn PhysicalExpr>>,
393        ) -> Result<Arc<dyn PhysicalExpr>> {
394            Ok(self)
395        }
396
397        fn is_volatile_node(&self) -> bool {
398            self.volatile
399        }
400
401        fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
402            write!(f, "mock_volatile({})", self.volatile)
403        }
404    }
405
406    #[test]
407    fn test_nested_expression_volatility() {
408        // Test that is_volatile() recursively detects volatility in expression trees
409
410        // Create a volatile mock expression
411        let volatile_expr =
412            Arc::new(MockVolatileExpr::new(true)) as Arc<dyn PhysicalExpr>;
413        assert!(volatile_expr.is_volatile_node());
414        assert!(is_volatile(&volatile_expr));
415
416        // Create a non-volatile mock expression
417        let stable_expr = Arc::new(MockVolatileExpr::new(false)) as Arc<dyn PhysicalExpr>;
418        assert!(!stable_expr.is_volatile_node());
419        assert!(!is_volatile(&stable_expr));
420
421        // Create a literal (non-volatile)
422        let literal =
423            Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
424        assert!(!literal.is_volatile_node());
425        assert!(!is_volatile(&literal));
426
427        // Test composite expression: volatile_expr AND literal
428        // The BinaryExpr itself is not volatile, but contains a volatile child
429        let composite_expr = Arc::new(BinaryExpr::new(
430            Arc::clone(&volatile_expr),
431            Operator::And,
432            Arc::clone(&literal),
433        )) as Arc<dyn PhysicalExpr>;
434
435        assert!(!composite_expr.is_volatile_node()); // BinaryExpr itself is not volatile
436        assert!(is_volatile(&composite_expr)); // But it contains a volatile child
437
438        // Test composite expression with all non-volatile children
439        let stable_composite = Arc::new(BinaryExpr::new(
440            Arc::clone(&stable_expr),
441            Operator::And,
442            Arc::clone(&literal),
443        )) as Arc<dyn PhysicalExpr>;
444
445        assert!(!stable_composite.is_volatile_node());
446        assert!(!is_volatile(&stable_composite)); // No volatile children
447    }
448}