datafusion_physical_expr/
physical_expr.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::create_physical_expr;
21use datafusion_common::{DFSchema, HashMap};
22use datafusion_expr::execution_props::ExecutionProps;
23pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
24pub use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
25use itertools::izip;
26
27/// This function is similar to the `contains` method of `Vec`. It finds
28/// whether `expr` is among `physical_exprs`.
29pub fn physical_exprs_contains(
30    physical_exprs: &[Arc<dyn PhysicalExpr>],
31    expr: &Arc<dyn PhysicalExpr>,
32) -> bool {
33    physical_exprs
34        .iter()
35        .any(|physical_expr| physical_expr.eq(expr))
36}
37
38/// Checks whether the given physical expression slices are equal.
39pub fn physical_exprs_equal(
40    lhs: &[Arc<dyn PhysicalExpr>],
41    rhs: &[Arc<dyn PhysicalExpr>],
42) -> bool {
43    lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
44}
45
46/// Checks whether the given physical expression slices are equal in the sense
47/// of bags (multi-sets), disregarding their orderings.
48pub fn physical_exprs_bag_equal(
49    lhs: &[Arc<dyn PhysicalExpr>],
50    rhs: &[Arc<dyn PhysicalExpr>],
51) -> bool {
52    let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
53    let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
54    for expr in lhs {
55        *multi_set_lhs.entry(expr).or_insert(0) += 1;
56    }
57    for expr in rhs {
58        *multi_set_rhs.entry(expr).or_insert(0) += 1;
59    }
60    multi_set_lhs == multi_set_rhs
61}
62
63use crate::{expressions, LexOrdering, PhysicalSortExpr};
64use arrow::compute::SortOptions;
65use arrow::datatypes::Schema;
66use datafusion_common::plan_err;
67use datafusion_common::Result;
68use datafusion_expr::{Expr, SortExpr};
69
70/// Converts logical sort expressions to physical sort expressions
71///
72/// This function transforms a collection of logical sort expressions into their physical
73/// representation that can be used during query execution.
74///
75/// # Arguments
76///
77/// * `schema` - The schema containing column definitions
78/// * `sort_order` - A collection of logical sort expressions grouped into lexicographic orderings
79///
80/// # Returns
81///
82/// A vector of lexicographic orderings for physical execution, or an error if the transformation fails
83///
84/// # Examples
85///
86/// ```
87/// // Create orderings from columns "id" and "name"
88/// # use arrow::datatypes::{Schema, Field, DataType};
89/// # use datafusion_physical_expr::create_ordering;
90/// # use datafusion_common::Column;
91/// # use datafusion_expr::{Expr, SortExpr};
92/// #
93/// // Create a schema with two fields
94/// let schema = Schema::new(vec![
95///     Field::new("id", DataType::Int32, false),
96///     Field::new("name", DataType::Utf8, false),
97/// ]);
98///
99/// let sort_exprs = vec![
100///     vec![
101///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "id")), asc: true, nulls_first: false }
102///     ],
103///     vec![
104///         SortExpr { expr: Expr::Column(Column::new(Some("t"), "name")), asc: false, nulls_first: true }
105///     ]
106/// ];
107/// let result = create_ordering(&schema, &sort_exprs).unwrap();
108/// ```
109pub fn create_ordering(
110    schema: &Schema,
111    sort_order: &[Vec<SortExpr>],
112) -> Result<Vec<LexOrdering>> {
113    let mut all_sort_orders = vec![];
114
115    for (group_idx, exprs) in sort_order.iter().enumerate() {
116        // Construct PhysicalSortExpr objects from Expr objects:
117        let mut sort_exprs = LexOrdering::default();
118        for (expr_idx, sort) in exprs.iter().enumerate() {
119            match &sort.expr {
120                Expr::Column(col) => match expressions::col(&col.name, schema) {
121                    Ok(expr) => {
122                        sort_exprs.push(PhysicalSortExpr {
123                            expr,
124                            options: SortOptions {
125                                descending: !sort.asc,
126                                nulls_first: sort.nulls_first,
127                            },
128                        });
129                    }
130                    // Cannot find expression in the projected_schema, stop iterating
131                    // since rest of the orderings are violated
132                    Err(_) => break,
133                },
134                expr => {
135                    return plan_err!(
136                        "Expected single column reference in sort_order[{}][{}], got {}",
137                        group_idx,
138                        expr_idx,
139                        expr
140                    );
141                }
142            }
143        }
144        if !sort_exprs.is_empty() {
145            all_sort_orders.push(sort_exprs);
146        }
147    }
148    Ok(all_sort_orders)
149}
150
151/// Create a physical sort expression from a logical expression
152pub fn create_physical_sort_expr(
153    e: &SortExpr,
154    input_dfschema: &DFSchema,
155    execution_props: &ExecutionProps,
156) -> Result<PhysicalSortExpr> {
157    let SortExpr {
158        expr,
159        asc,
160        nulls_first,
161    } = e;
162    Ok(PhysicalSortExpr {
163        expr: create_physical_expr(expr, input_dfschema, execution_props)?,
164        options: SortOptions {
165            descending: !asc,
166            nulls_first: *nulls_first,
167        },
168    })
169}
170
171/// Create vector of physical sort expression from a vector of logical expression
172pub fn create_physical_sort_exprs(
173    exprs: &[SortExpr],
174    input_dfschema: &DFSchema,
175    execution_props: &ExecutionProps,
176) -> Result<LexOrdering> {
177    exprs
178        .iter()
179        .map(|expr| create_physical_sort_expr(expr, input_dfschema, execution_props))
180        .collect::<Result<LexOrdering>>()
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186
187    use crate::expressions::{Column, Literal};
188    use crate::physical_expr::{
189        physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
190    };
191
192    use datafusion_common::ScalarValue;
193
194    #[test]
195    fn test_physical_exprs_contains() {
196        let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
197            as Arc<dyn PhysicalExpr>;
198        let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
199            as Arc<dyn PhysicalExpr>;
200        let lit4 =
201            Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
202        let lit2 =
203            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
204        let lit1 =
205            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
206        let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
207        let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
208        let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
209
210        // lit(true), lit(false), lit(4), lit(2), Col(a), Col(b)
211        let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
212            Arc::clone(&lit_true),
213            Arc::clone(&lit_false),
214            Arc::clone(&lit4),
215            Arc::clone(&lit2),
216            Arc::clone(&col_a_expr),
217            Arc::clone(&col_b_expr),
218        ];
219        // below expressions are inside physical_exprs
220        assert!(physical_exprs_contains(&physical_exprs, &lit_true));
221        assert!(physical_exprs_contains(&physical_exprs, &lit2));
222        assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
223
224        // below expressions are not inside physical_exprs
225        assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
226        assert!(!physical_exprs_contains(&physical_exprs, &lit1));
227    }
228
229    #[test]
230    fn test_physical_exprs_equal() {
231        let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
232            as Arc<dyn PhysicalExpr>;
233        let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
234            as Arc<dyn PhysicalExpr>;
235        let lit1 =
236            Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
237        let lit2 =
238            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
239        let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
240
241        let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
242        let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
243        let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
244        let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
245
246        // these vectors are same
247        assert!(physical_exprs_equal(&vec1, &vec1));
248        assert!(physical_exprs_equal(&vec1, &vec4));
249        assert!(physical_exprs_bag_equal(&vec1, &vec1));
250        assert!(physical_exprs_bag_equal(&vec1, &vec4));
251
252        // these vectors are different
253        assert!(!physical_exprs_equal(&vec1, &vec2));
254        assert!(!physical_exprs_equal(&vec1, &vec3));
255        assert!(!physical_exprs_bag_equal(&vec1, &vec2));
256        assert!(!physical_exprs_bag_equal(&vec1, &vec3));
257    }
258
259    #[test]
260    fn test_physical_exprs_set_equal() {
261        let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
262            Arc::new(Column::new("a", 0)),
263            Arc::new(Column::new("a", 0)),
264            Arc::new(Column::new("b", 1)),
265        ];
266        let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
267            Arc::new(Column::new("b", 1)),
268            Arc::new(Column::new("b", 1)),
269            Arc::new(Column::new("a", 0)),
270        ];
271        assert!(!physical_exprs_bag_equal(
272            list1.as_slice(),
273            list2.as_slice()
274        ));
275        assert!(!physical_exprs_bag_equal(
276            list2.as_slice(),
277            list1.as_slice()
278        ));
279        assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
280        assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
281
282        let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
283            Arc::new(Column::new("a", 0)),
284            Arc::new(Column::new("b", 1)),
285            Arc::new(Column::new("c", 2)),
286            Arc::new(Column::new("a", 0)),
287            Arc::new(Column::new("b", 1)),
288        ];
289        let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
290            Arc::new(Column::new("b", 1)),
291            Arc::new(Column::new("b", 1)),
292            Arc::new(Column::new("a", 0)),
293            Arc::new(Column::new("c", 2)),
294            Arc::new(Column::new("a", 0)),
295        ];
296        assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
297        assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
298        assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
299        assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
300        assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
301        assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
302        assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
303        assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
304    }
305}