datafusion_physical_expr/
physical_expr.rs1use std::sync::Arc;
19
20use crate::expressions::{self, Column};
21use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr};
22
23use arrow::compute::SortOptions;
24use arrow::datatypes::Schema;
25use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
26use datafusion_common::{plan_err, Result};
27use datafusion_common::{DFSchema, HashMap};
28use datafusion_expr::execution_props::ExecutionProps;
29use datafusion_expr::{Expr, SortExpr};
30
31use itertools::izip;
32
33pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
35
36pub fn add_offset_to_expr(
39 expr: Arc<dyn PhysicalExpr>,
40 offset: isize,
41) -> Result<Arc<dyn PhysicalExpr>> {
42 expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
43 Some(col) => {
44 let Some(idx) = col.index().checked_add_signed(offset) else {
45 return plan_err!("Column index overflow");
46 };
47 Ok(Transformed::yes(Arc::new(Column::new(col.name(), idx))))
48 }
49 None => Ok(Transformed::no(e)),
50 })
51 .data()
52}
53
54pub fn physical_exprs_contains(
57 physical_exprs: &[Arc<dyn PhysicalExpr>],
58 expr: &Arc<dyn PhysicalExpr>,
59) -> bool {
60 physical_exprs
61 .iter()
62 .any(|physical_expr| physical_expr.eq(expr))
63}
64
65pub fn physical_exprs_equal(
67 lhs: &[Arc<dyn PhysicalExpr>],
68 rhs: &[Arc<dyn PhysicalExpr>],
69) -> bool {
70 lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
71}
72
73pub fn physical_exprs_bag_equal(
76 lhs: &[Arc<dyn PhysicalExpr>],
77 rhs: &[Arc<dyn PhysicalExpr>],
78) -> bool {
79 let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
80 let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
81 for expr in lhs {
82 *multi_set_lhs.entry(expr).or_insert(0) += 1;
83 }
84 for expr in rhs {
85 *multi_set_rhs.entry(expr).or_insert(0) += 1;
86 }
87 multi_set_lhs == multi_set_rhs
88}
89
90pub fn create_ordering(
132 schema: &Schema,
133 sort_order: &[Vec<SortExpr>],
134) -> Result<Vec<LexOrdering>> {
135 let mut all_sort_orders = vec![];
136
137 for (group_idx, exprs) in sort_order.iter().enumerate() {
138 let mut sort_exprs = vec![];
140 for (expr_idx, sort) in exprs.iter().enumerate() {
141 match &sort.expr {
142 Expr::Column(col) => match expressions::col(&col.name, schema) {
143 Ok(expr) => {
144 let opts = SortOptions::new(!sort.asc, sort.nulls_first);
145 sort_exprs.push(PhysicalSortExpr::new(expr, opts));
146 }
147 Err(_) => break,
150 },
151 expr => {
152 return plan_err!(
153 "Expected single column reference in sort_order[{}][{}], got {}",
154 group_idx,
155 expr_idx,
156 expr
157 );
158 }
159 }
160 }
161 all_sort_orders.extend(LexOrdering::new(sort_exprs));
162 }
163 Ok(all_sort_orders)
164}
165
166pub fn create_physical_sort_expr(
168 e: &SortExpr,
169 input_dfschema: &DFSchema,
170 execution_props: &ExecutionProps,
171) -> Result<PhysicalSortExpr> {
172 create_physical_expr(&e.expr, input_dfschema, execution_props).map(|expr| {
173 let options = SortOptions::new(!e.asc, e.nulls_first);
174 PhysicalSortExpr::new(expr, options)
175 })
176}
177
178pub fn create_physical_sort_exprs(
180 exprs: &[SortExpr],
181 input_dfschema: &DFSchema,
182 execution_props: &ExecutionProps,
183) -> Result<Vec<PhysicalSortExpr>> {
184 exprs
185 .iter()
186 .map(|e| create_physical_sort_expr(e, input_dfschema, execution_props))
187 .collect()
188}
189
190pub fn add_offset_to_physical_sort_exprs(
191 sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
192 offset: isize,
193) -> Result<Vec<PhysicalSortExpr>> {
194 sort_exprs
195 .into_iter()
196 .map(|mut sort_expr| {
197 sort_expr.expr = add_offset_to_expr(sort_expr.expr, offset)?;
198 Ok(sort_expr)
199 })
200 .collect()
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 use crate::expressions::{Column, Literal};
208 use crate::physical_expr::{
209 physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
210 };
211
212 use datafusion_common::ScalarValue;
213
214 #[test]
215 fn test_physical_exprs_contains() {
216 let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
217 as Arc<dyn PhysicalExpr>;
218 let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
219 as Arc<dyn PhysicalExpr>;
220 let lit4 =
221 Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
222 let lit2 =
223 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
224 let lit1 =
225 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
226 let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
227 let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
228 let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
229
230 let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
232 Arc::clone(&lit_true),
233 Arc::clone(&lit_false),
234 Arc::clone(&lit4),
235 Arc::clone(&lit2),
236 Arc::clone(&col_a_expr),
237 Arc::clone(&col_b_expr),
238 ];
239 assert!(physical_exprs_contains(&physical_exprs, &lit_true));
241 assert!(physical_exprs_contains(&physical_exprs, &lit2));
242 assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
243
244 assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
246 assert!(!physical_exprs_contains(&physical_exprs, &lit1));
247 }
248
249 #[test]
250 fn test_physical_exprs_equal() {
251 let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
252 as Arc<dyn PhysicalExpr>;
253 let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
254 as Arc<dyn PhysicalExpr>;
255 let lit1 =
256 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
257 let lit2 =
258 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
259 let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
260
261 let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
262 let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
263 let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
264 let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
265
266 assert!(physical_exprs_equal(&vec1, &vec1));
268 assert!(physical_exprs_equal(&vec1, &vec4));
269 assert!(physical_exprs_bag_equal(&vec1, &vec1));
270 assert!(physical_exprs_bag_equal(&vec1, &vec4));
271
272 assert!(!physical_exprs_equal(&vec1, &vec2));
274 assert!(!physical_exprs_equal(&vec1, &vec3));
275 assert!(!physical_exprs_bag_equal(&vec1, &vec2));
276 assert!(!physical_exprs_bag_equal(&vec1, &vec3));
277 }
278
279 #[test]
280 fn test_physical_exprs_set_equal() {
281 let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
282 Arc::new(Column::new("a", 0)),
283 Arc::new(Column::new("a", 0)),
284 Arc::new(Column::new("b", 1)),
285 ];
286 let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
287 Arc::new(Column::new("b", 1)),
288 Arc::new(Column::new("b", 1)),
289 Arc::new(Column::new("a", 0)),
290 ];
291 assert!(!physical_exprs_bag_equal(
292 list1.as_slice(),
293 list2.as_slice()
294 ));
295 assert!(!physical_exprs_bag_equal(
296 list2.as_slice(),
297 list1.as_slice()
298 ));
299 assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
300 assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
301
302 let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
303 Arc::new(Column::new("a", 0)),
304 Arc::new(Column::new("b", 1)),
305 Arc::new(Column::new("c", 2)),
306 Arc::new(Column::new("a", 0)),
307 Arc::new(Column::new("b", 1)),
308 ];
309 let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
310 Arc::new(Column::new("b", 1)),
311 Arc::new(Column::new("b", 1)),
312 Arc::new(Column::new("a", 0)),
313 Arc::new(Column::new("c", 2)),
314 Arc::new(Column::new("a", 0)),
315 ];
316 assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
317 assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
318 assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
319 assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
320 assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
321 assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
322 assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
323 assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
324 }
325}