datafusion_physical_expr/
physical_expr.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::expressions::{self, Column};
21use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr};
22
23use arrow::compute::SortOptions;
24use arrow::datatypes::Schema;
25use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
26use datafusion_common::{plan_err, Result};
27use datafusion_common::{DFSchema, HashMap};
28use datafusion_expr::execution_props::ExecutionProps;
29use datafusion_expr::{Expr, SortExpr};
30
31use itertools::izip;
32
33// Exports:
34pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
35
36/// Adds the `offset` value to `Column` indices inside `expr`. This function is
37/// generally used during the update of the right table schema in join operations.
38pub fn add_offset_to_expr(
39    expr: Arc<dyn PhysicalExpr>,
40    offset: isize,
41) -> Result<Arc<dyn PhysicalExpr>> {
42    expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
43        Some(col) => {
44            let Some(idx) = col.index().checked_add_signed(offset) else {
45                return plan_err!("Column index overflow");
46            };
47            Ok(Transformed::yes(Arc::new(Column::new(col.name(), idx))))
48        }
49        None => Ok(Transformed::no(e)),
50    })
51    .data()
52}
53
54/// This function is similar to the `contains` method of `Vec`. It finds
55/// whether `expr` is among `physical_exprs`.
56pub fn physical_exprs_contains(
57    physical_exprs: &[Arc<dyn PhysicalExpr>],
58    expr: &Arc<dyn PhysicalExpr>,
59) -> bool {
60    physical_exprs
61        .iter()
62        .any(|physical_expr| physical_expr.eq(expr))
63}
64
65/// Checks whether the given physical expression slices are equal.
66pub fn physical_exprs_equal(
67    lhs: &[Arc<dyn PhysicalExpr>],
68    rhs: &[Arc<dyn PhysicalExpr>],
69) -> bool {
70    lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
71}
72
73/// Checks whether the given physical expression slices are equal in the sense
74/// of bags (multi-sets), disregarding their orderings.
75pub fn physical_exprs_bag_equal(
76    lhs: &[Arc<dyn PhysicalExpr>],
77    rhs: &[Arc<dyn PhysicalExpr>],
78) -> bool {
79    let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
80    let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
81    for expr in lhs {
82        *multi_set_lhs.entry(expr).or_insert(0) += 1;
83    }
84    for expr in rhs {
85        *multi_set_rhs.entry(expr).or_insert(0) += 1;
86    }
87    multi_set_lhs == multi_set_rhs
88}
89
90/// Converts logical sort expressions to physical sort expressions.
91///
92/// This function transforms a collection of logical sort expressions into their
93/// physical representation that can be used during query execution.
94///
95/// # Arguments
96///
97/// * `schema` - The schema containing column definitions.
98/// * `sort_order` - A collection of logical sort expressions grouped into
99///   lexicographic orderings.
100///
101/// # Returns
102///
103/// A vector of lexicographic orderings for physical execution, or an error if
104/// the transformation fails.
105///
106/// # Examples
107///
108/// ```
109/// // Create orderings from columns "id" and "name"
110/// # use arrow::datatypes::{Schema, Field, DataType};
111/// # use datafusion_physical_expr::create_ordering;
112/// # use datafusion_common::Column;
113/// # use datafusion_expr::{Expr, SortExpr};
114/// #
115/// // Create a schema with two fields
116/// let schema = Schema::new(vec![
117///     Field::new("id", DataType::Int32, false),
118///     Field::new("name", DataType::Utf8, false),
119/// ]);
120///
121/// let sort_exprs = vec![
122///     vec![
123///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc: true, nulls_first: false }
124///     ],
125///     vec![
126///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")), asc: false, nulls_first: true }
127///     ]
128/// ];
129/// let result = create_ordering(&schema, &sort_exprs).unwrap();
130/// ```
131pub fn create_ordering(
132    schema: &Schema,
133    sort_order: &[Vec<SortExpr>],
134) -> Result<Vec<LexOrdering>> {
135    let mut all_sort_orders = vec![];
136
137    for (group_idx, exprs) in sort_order.iter().enumerate() {
138        // Construct PhysicalSortExpr objects from Expr objects:
139        let mut sort_exprs = vec![];
140        for (expr_idx, sort) in exprs.iter().enumerate() {
141            match &sort.expr {
142                Expr::Column(col) => match expressions::col(&col.name, schema) {
143                    Ok(expr) => {
144                        let opts = SortOptions::new(!sort.asc, sort.nulls_first);
145                        sort_exprs.push(PhysicalSortExpr::new(expr, opts));
146                    }
147                    // Cannot find expression in the projected_schema, stop iterating
148                    // since rest of the orderings are violated
149                    Err(_) => break,
150                },
151                expr => {
152                    return plan_err!(
153                        "Expected single column reference in sort_order[{}][{}], got {}",
154                        group_idx,
155                        expr_idx,
156                        expr
157                    );
158                }
159            }
160        }
161        all_sort_orders.extend(LexOrdering::new(sort_exprs));
162    }
163    Ok(all_sort_orders)
164}
165
166/// Create a physical sort expression from a logical expression
167pub fn create_physical_sort_expr(
168    e: &SortExpr,
169    input_dfschema: &DFSchema,
170    execution_props: &ExecutionProps,
171) -> Result<PhysicalSortExpr> {
172    create_physical_expr(&e.expr, input_dfschema, execution_props).map(|expr| {
173        let options = SortOptions::new(!e.asc, e.nulls_first);
174        PhysicalSortExpr::new(expr, options)
175    })
176}
177
178/// Create vector of physical sort expression from a vector of logical expression
179pub fn create_physical_sort_exprs(
180    exprs: &[SortExpr],
181    input_dfschema: &DFSchema,
182    execution_props: &ExecutionProps,
183) -> Result<Vec<PhysicalSortExpr>> {
184    exprs
185        .iter()
186        .map(|e| create_physical_sort_expr(e, input_dfschema, execution_props))
187        .collect()
188}
189
190pub fn add_offset_to_physical_sort_exprs(
191    sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
192    offset: isize,
193) -> Result<Vec<PhysicalSortExpr>> {
194    sort_exprs
195        .into_iter()
196        .map(|mut sort_expr| {
197            sort_expr.expr = add_offset_to_expr(sort_expr.expr, offset)?;
198            Ok(sort_expr)
199        })
200        .collect()
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    use crate::expressions::{Column, Literal};
208    use crate::physical_expr::{
209        physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
210    };
211
212    use datafusion_common::ScalarValue;
213
214    #[test]
215    fn test_physical_exprs_contains() {
216        let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
217            as Arc<dyn PhysicalExpr>;
218        let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
219            as Arc<dyn PhysicalExpr>;
220        let lit4 =
221            Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
222        let lit2 =
223            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
224        let lit1 =
225            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
226        let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
227        let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
228        let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
229
230        // lit(true), lit(false), lit(4), lit(2), Col(a), Col(b)
231        let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
232            Arc::clone(&lit_true),
233            Arc::clone(&lit_false),
234            Arc::clone(&lit4),
235            Arc::clone(&lit2),
236            Arc::clone(&col_a_expr),
237            Arc::clone(&col_b_expr),
238        ];
239        // below expressions are inside physical_exprs
240        assert!(physical_exprs_contains(&physical_exprs, &lit_true));
241        assert!(physical_exprs_contains(&physical_exprs, &lit2));
242        assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
243
244        // below expressions are not inside physical_exprs
245        assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
246        assert!(!physical_exprs_contains(&physical_exprs, &lit1));
247    }
248
249    #[test]
250    fn test_physical_exprs_equal() {
251        let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
252            as Arc<dyn PhysicalExpr>;
253        let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
254            as Arc<dyn PhysicalExpr>;
255        let lit1 =
256            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
257        let lit2 =
258            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
259        let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
260
261        let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
262        let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
263        let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
264        let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
265
266        // these vectors are same
267        assert!(physical_exprs_equal(&vec1, &vec1));
268        assert!(physical_exprs_equal(&vec1, &vec4));
269        assert!(physical_exprs_bag_equal(&vec1, &vec1));
270        assert!(physical_exprs_bag_equal(&vec1, &vec4));
271
272        // these vectors are different
273        assert!(!physical_exprs_equal(&vec1, &vec2));
274        assert!(!physical_exprs_equal(&vec1, &vec3));
275        assert!(!physical_exprs_bag_equal(&vec1, &vec2));
276        assert!(!physical_exprs_bag_equal(&vec1, &vec3));
277    }
278
279    #[test]
280    fn test_physical_exprs_set_equal() {
281        let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
282            Arc::new(Column::new("a", 0)),
283            Arc::new(Column::new("a", 0)),
284            Arc::new(Column::new("b", 1)),
285        ];
286        let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
287            Arc::new(Column::new("b", 1)),
288            Arc::new(Column::new("b", 1)),
289            Arc::new(Column::new("a", 0)),
290        ];
291        assert!(!physical_exprs_bag_equal(
292            list1.as_slice(),
293            list2.as_slice()
294        ));
295        assert!(!physical_exprs_bag_equal(
296            list2.as_slice(),
297            list1.as_slice()
298        ));
299        assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
300        assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
301
302        let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
303            Arc::new(Column::new("a", 0)),
304            Arc::new(Column::new("b", 1)),
305            Arc::new(Column::new("c", 2)),
306            Arc::new(Column::new("a", 0)),
307            Arc::new(Column::new("b", 1)),
308        ];
309        let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
310            Arc::new(Column::new("b", 1)),
311            Arc::new(Column::new("b", 1)),
312            Arc::new(Column::new("a", 0)),
313            Arc::new(Column::new("c", 2)),
314            Arc::new(Column::new("a", 0)),
315        ];
316        assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
317        assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
318        assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
319        assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
320        assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
321        assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
322        assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
323        assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
324    }
325}