datafusion_physical_expr/
physical_expr.rs1use std::sync::Arc;
19
20use crate::create_physical_expr;
21use datafusion_common::{DFSchema, HashMap};
22use datafusion_expr::execution_props::ExecutionProps;
23pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
24pub use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
25use itertools::izip;
26
27pub fn physical_exprs_contains(
30 physical_exprs: &[Arc<dyn PhysicalExpr>],
31 expr: &Arc<dyn PhysicalExpr>,
32) -> bool {
33 physical_exprs
34 .iter()
35 .any(|physical_expr| physical_expr.eq(expr))
36}
37
38pub fn physical_exprs_equal(
40 lhs: &[Arc<dyn PhysicalExpr>],
41 rhs: &[Arc<dyn PhysicalExpr>],
42) -> bool {
43 lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
44}
45
46pub fn physical_exprs_bag_equal(
49 lhs: &[Arc<dyn PhysicalExpr>],
50 rhs: &[Arc<dyn PhysicalExpr>],
51) -> bool {
52 let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
53 let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
54 for expr in lhs {
55 *multi_set_lhs.entry(expr).or_insert(0) += 1;
56 }
57 for expr in rhs {
58 *multi_set_rhs.entry(expr).or_insert(0) += 1;
59 }
60 multi_set_lhs == multi_set_rhs
61}
62
63use crate::{expressions, LexOrdering, PhysicalSortExpr};
64use arrow::compute::SortOptions;
65use arrow::datatypes::Schema;
66use datafusion_common::plan_err;
67use datafusion_common::Result;
68use datafusion_expr::{Expr, SortExpr};
69
70pub fn create_ordering(
110 schema: &Schema,
111 sort_order: &[Vec<SortExpr>],
112) -> Result<Vec<LexOrdering>> {
113 let mut all_sort_orders = vec![];
114
115 for (group_idx, exprs) in sort_order.iter().enumerate() {
116 let mut sort_exprs = LexOrdering::default();
118 for (expr_idx, sort) in exprs.iter().enumerate() {
119 match &sort.expr {
120 Expr::Column(col) => match expressions::col(&col.name, schema) {
121 Ok(expr) => {
122 sort_exprs.push(PhysicalSortExpr {
123 expr,
124 options: SortOptions {
125 descending: !sort.asc,
126 nulls_first: sort.nulls_first,
127 },
128 });
129 }
130 Err(_) => break,
133 },
134 expr => {
135 return plan_err!(
136 "Expected single column reference in sort_order[{}][{}], got {}",
137 group_idx,
138 expr_idx,
139 expr
140 );
141 }
142 }
143 }
144 if !sort_exprs.is_empty() {
145 all_sort_orders.push(sort_exprs);
146 }
147 }
148 Ok(all_sort_orders)
149}
150
151pub fn create_physical_sort_expr(
153 e: &SortExpr,
154 input_dfschema: &DFSchema,
155 execution_props: &ExecutionProps,
156) -> Result<PhysicalSortExpr> {
157 let SortExpr {
158 expr,
159 asc,
160 nulls_first,
161 } = e;
162 Ok(PhysicalSortExpr {
163 expr: create_physical_expr(expr, input_dfschema, execution_props)?,
164 options: SortOptions {
165 descending: !asc,
166 nulls_first: *nulls_first,
167 },
168 })
169}
170
171pub fn create_physical_sort_exprs(
173 exprs: &[SortExpr],
174 input_dfschema: &DFSchema,
175 execution_props: &ExecutionProps,
176) -> Result<LexOrdering> {
177 exprs
178 .iter()
179 .map(|expr| create_physical_sort_expr(expr, input_dfschema, execution_props))
180 .collect::<Result<LexOrdering>>()
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186
187 use crate::expressions::{Column, Literal};
188 use crate::physical_expr::{
189 physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
190 };
191
192 use datafusion_common::ScalarValue;
193
194 #[test]
195 fn test_physical_exprs_contains() {
196 let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
197 as Arc<dyn PhysicalExpr>;
198 let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
199 as Arc<dyn PhysicalExpr>;
200 let lit4 =
201 Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
202 let lit2 =
203 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
204 let lit1 =
205 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
206 let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
207 let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
208 let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
209
210 let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
212 Arc::clone(&lit_true),
213 Arc::clone(&lit_false),
214 Arc::clone(&lit4),
215 Arc::clone(&lit2),
216 Arc::clone(&col_a_expr),
217 Arc::clone(&col_b_expr),
218 ];
219 assert!(physical_exprs_contains(&physical_exprs, &lit_true));
221 assert!(physical_exprs_contains(&physical_exprs, &lit2));
222 assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
223
224 assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
226 assert!(!physical_exprs_contains(&physical_exprs, &lit1));
227 }
228
229 #[test]
230 fn test_physical_exprs_equal() {
231 let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
232 as Arc<dyn PhysicalExpr>;
233 let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
234 as Arc<dyn PhysicalExpr>;
235 let lit1 =
236 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
237 let lit2 =
238 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
239 let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
240
241 let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
242 let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
243 let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
244 let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
245
246 assert!(physical_exprs_equal(&vec1, &vec1));
248 assert!(physical_exprs_equal(&vec1, &vec4));
249 assert!(physical_exprs_bag_equal(&vec1, &vec1));
250 assert!(physical_exprs_bag_equal(&vec1, &vec4));
251
252 assert!(!physical_exprs_equal(&vec1, &vec2));
254 assert!(!physical_exprs_equal(&vec1, &vec3));
255 assert!(!physical_exprs_bag_equal(&vec1, &vec2));
256 assert!(!physical_exprs_bag_equal(&vec1, &vec3));
257 }
258
259 #[test]
260 fn test_physical_exprs_set_equal() {
261 let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
262 Arc::new(Column::new("a", 0)),
263 Arc::new(Column::new("a", 0)),
264 Arc::new(Column::new("b", 1)),
265 ];
266 let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
267 Arc::new(Column::new("b", 1)),
268 Arc::new(Column::new("b", 1)),
269 Arc::new(Column::new("a", 0)),
270 ];
271 assert!(!physical_exprs_bag_equal(
272 list1.as_slice(),
273 list2.as_slice()
274 ));
275 assert!(!physical_exprs_bag_equal(
276 list2.as_slice(),
277 list1.as_slice()
278 ));
279 assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
280 assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
281
282 let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
283 Arc::new(Column::new("a", 0)),
284 Arc::new(Column::new("b", 1)),
285 Arc::new(Column::new("c", 2)),
286 Arc::new(Column::new("a", 0)),
287 Arc::new(Column::new("b", 1)),
288 ];
289 let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
290 Arc::new(Column::new("b", 1)),
291 Arc::new(Column::new("b", 1)),
292 Arc::new(Column::new("a", 0)),
293 Arc::new(Column::new("c", 2)),
294 Arc::new(Column::new("a", 0)),
295 ];
296 assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
297 assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
298 assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
299 assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
300 assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
301 assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
302 assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
303 assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
304 }
305}