datafusion_physical_expr/
physical_expr.rs1use std::sync::Arc;
19
20use crate::expressions::{self, Column};
21use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr};
22
23use arrow::compute::SortOptions;
24use arrow::datatypes::Schema;
25use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
26use datafusion_common::{plan_err, Result};
27use datafusion_common::{DFSchema, HashMap};
28use datafusion_expr::execution_props::ExecutionProps;
29use datafusion_expr::{Expr, SortExpr};
30
31use itertools::izip;
32
33pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
35
36pub fn add_offset_to_expr(
39 expr: Arc<dyn PhysicalExpr>,
40 offset: isize,
41) -> Result<Arc<dyn PhysicalExpr>> {
42 expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
43 Some(col) => {
44 let Some(idx) = col.index().checked_add_signed(offset) else {
45 return plan_err!("Column index overflow");
46 };
47 Ok(Transformed::yes(Arc::new(Column::new(col.name(), idx))))
48 }
49 None => Ok(Transformed::no(e)),
50 })
51 .data()
52}
53
54pub fn physical_exprs_contains(
57 physical_exprs: &[Arc<dyn PhysicalExpr>],
58 expr: &Arc<dyn PhysicalExpr>,
59) -> bool {
60 physical_exprs
61 .iter()
62 .any(|physical_expr| physical_expr.eq(expr))
63}
64
65pub fn physical_exprs_equal(
67 lhs: &[Arc<dyn PhysicalExpr>],
68 rhs: &[Arc<dyn PhysicalExpr>],
69) -> bool {
70 lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
71}
72
73pub fn physical_exprs_bag_equal(
76 lhs: &[Arc<dyn PhysicalExpr>],
77 rhs: &[Arc<dyn PhysicalExpr>],
78) -> bool {
79 let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
80 let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
81 for expr in lhs {
82 *multi_set_lhs.entry(expr).or_insert(0) += 1;
83 }
84 for expr in rhs {
85 *multi_set_rhs.entry(expr).or_insert(0) += 1;
86 }
87 multi_set_lhs == multi_set_rhs
88}
89
90pub fn create_ordering(
132 schema: &Schema,
133 sort_order: &[Vec<SortExpr>],
134) -> Result<Vec<LexOrdering>> {
135 let mut all_sort_orders = vec![];
136
137 for (group_idx, exprs) in sort_order.iter().enumerate() {
138 let mut sort_exprs = vec![];
140 for (expr_idx, sort) in exprs.iter().enumerate() {
141 match &sort.expr {
142 Expr::Column(col) => match expressions::col(&col.name, schema) {
143 Ok(expr) => {
144 let opts = SortOptions::new(!sort.asc, sort.nulls_first);
145 sort_exprs.push(PhysicalSortExpr::new(expr, opts));
146 }
147 Err(_) => break,
150 },
151 expr => {
152 return plan_err!(
153 "Expected single column reference in sort_order[{}][{}], got {}",
154 group_idx,
155 expr_idx,
156 expr
157 );
158 }
159 }
160 }
161 all_sort_orders.extend(LexOrdering::new(sort_exprs));
162 }
163 Ok(all_sort_orders)
164}
165
166pub fn create_physical_sort_expr(
168 e: &SortExpr,
169 input_dfschema: &DFSchema,
170 execution_props: &ExecutionProps,
171) -> Result<PhysicalSortExpr> {
172 create_physical_expr(&e.expr, input_dfschema, execution_props).map(|expr| {
173 let options = SortOptions::new(!e.asc, e.nulls_first);
174 PhysicalSortExpr::new(expr, options)
175 })
176}
177
178pub fn create_physical_sort_exprs(
180 exprs: &[SortExpr],
181 input_dfschema: &DFSchema,
182 execution_props: &ExecutionProps,
183) -> Result<Vec<PhysicalSortExpr>> {
184 exprs
185 .iter()
186 .map(|e| create_physical_sort_expr(e, input_dfschema, execution_props))
187 .collect()
188}
189
190pub fn add_offset_to_physical_sort_exprs(
191 sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
192 offset: isize,
193) -> Result<Vec<PhysicalSortExpr>> {
194 sort_exprs
195 .into_iter()
196 .map(|mut sort_expr| {
197 sort_expr.expr = add_offset_to_expr(sort_expr.expr, offset)?;
198 Ok(sort_expr)
199 })
200 .collect()
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 use crate::expressions::{BinaryExpr, Column, Literal};
208 use crate::physical_expr::{
209 physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
210 };
211 use datafusion_physical_expr_common::physical_expr::is_volatile;
212
213 use arrow::datatypes::{DataType, Schema};
214 use arrow::record_batch::RecordBatch;
215 use datafusion_common::{Result, ScalarValue};
216 use datafusion_expr::ColumnarValue;
217 use datafusion_expr::Operator;
218 use std::any::Any;
219 use std::fmt;
220
221 #[test]
222 fn test_physical_exprs_contains() {
223 let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
224 as Arc<dyn PhysicalExpr>;
225 let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
226 as Arc<dyn PhysicalExpr>;
227 let lit4 =
228 Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
229 let lit2 =
230 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
231 let lit1 =
232 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
233 let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
234 let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
235 let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
236
237 let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
239 Arc::clone(&lit_true),
240 Arc::clone(&lit_false),
241 Arc::clone(&lit4),
242 Arc::clone(&lit2),
243 Arc::clone(&col_a_expr),
244 Arc::clone(&col_b_expr),
245 ];
246 assert!(physical_exprs_contains(&physical_exprs, &lit_true));
248 assert!(physical_exprs_contains(&physical_exprs, &lit2));
249 assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
250
251 assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
253 assert!(!physical_exprs_contains(&physical_exprs, &lit1));
254 }
255
256 #[test]
257 fn test_physical_exprs_equal() {
258 let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
259 as Arc<dyn PhysicalExpr>;
260 let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
261 as Arc<dyn PhysicalExpr>;
262 let lit1 =
263 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
264 let lit2 =
265 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
266 let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
267
268 let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
269 let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
270 let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
271 let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
272
273 assert!(physical_exprs_equal(&vec1, &vec1));
275 assert!(physical_exprs_equal(&vec1, &vec4));
276 assert!(physical_exprs_bag_equal(&vec1, &vec1));
277 assert!(physical_exprs_bag_equal(&vec1, &vec4));
278
279 assert!(!physical_exprs_equal(&vec1, &vec2));
281 assert!(!physical_exprs_equal(&vec1, &vec3));
282 assert!(!physical_exprs_bag_equal(&vec1, &vec2));
283 assert!(!physical_exprs_bag_equal(&vec1, &vec3));
284 }
285
286 #[test]
287 fn test_physical_exprs_set_equal() {
288 let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
289 Arc::new(Column::new("a", 0)),
290 Arc::new(Column::new("a", 0)),
291 Arc::new(Column::new("b", 1)),
292 ];
293 let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
294 Arc::new(Column::new("b", 1)),
295 Arc::new(Column::new("b", 1)),
296 Arc::new(Column::new("a", 0)),
297 ];
298 assert!(!physical_exprs_bag_equal(
299 list1.as_slice(),
300 list2.as_slice()
301 ));
302 assert!(!physical_exprs_bag_equal(
303 list2.as_slice(),
304 list1.as_slice()
305 ));
306 assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
307 assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
308
309 let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
310 Arc::new(Column::new("a", 0)),
311 Arc::new(Column::new("b", 1)),
312 Arc::new(Column::new("c", 2)),
313 Arc::new(Column::new("a", 0)),
314 Arc::new(Column::new("b", 1)),
315 ];
316 let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
317 Arc::new(Column::new("b", 1)),
318 Arc::new(Column::new("b", 1)),
319 Arc::new(Column::new("a", 0)),
320 Arc::new(Column::new("c", 2)),
321 Arc::new(Column::new("a", 0)),
322 ];
323 assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
324 assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
325 assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
326 assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
327 assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
328 assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
329 assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
330 assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
331 }
332
333 #[test]
334 fn test_is_volatile_default_behavior() {
335 let literal =
337 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
338 let column = Arc::new(Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
339
340 assert!(!literal.is_volatile_node());
342 assert!(!column.is_volatile_node());
343
344 assert!(!is_volatile(&literal));
346 assert!(!is_volatile(&column));
347 }
348
349 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
351 struct MockVolatileExpr {
352 volatile: bool,
353 }
354
355 impl MockVolatileExpr {
356 fn new(volatile: bool) -> Self {
357 Self { volatile }
358 }
359 }
360
361 impl fmt::Display for MockVolatileExpr {
362 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
363 write!(f, "MockVolatile({})", self.volatile)
364 }
365 }
366
367 impl PhysicalExpr for MockVolatileExpr {
368 fn as_any(&self) -> &dyn Any {
369 self
370 }
371
372 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
373 Ok(DataType::Boolean)
374 }
375
376 fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
377 Ok(false)
378 }
379
380 fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
381 Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(
382 self.volatile,
383 ))))
384 }
385
386 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
387 vec![]
388 }
389
390 fn with_new_children(
391 self: Arc<Self>,
392 _children: Vec<Arc<dyn PhysicalExpr>>,
393 ) -> Result<Arc<dyn PhysicalExpr>> {
394 Ok(self)
395 }
396
397 fn is_volatile_node(&self) -> bool {
398 self.volatile
399 }
400
401 fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
402 write!(f, "mock_volatile({})", self.volatile)
403 }
404 }
405
406 #[test]
407 fn test_nested_expression_volatility() {
408 let volatile_expr =
412 Arc::new(MockVolatileExpr::new(true)) as Arc<dyn PhysicalExpr>;
413 assert!(volatile_expr.is_volatile_node());
414 assert!(is_volatile(&volatile_expr));
415
416 let stable_expr = Arc::new(MockVolatileExpr::new(false)) as Arc<dyn PhysicalExpr>;
418 assert!(!stable_expr.is_volatile_node());
419 assert!(!is_volatile(&stable_expr));
420
421 let literal =
423 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
424 assert!(!literal.is_volatile_node());
425 assert!(!is_volatile(&literal));
426
427 let composite_expr = Arc::new(BinaryExpr::new(
430 Arc::clone(&volatile_expr),
431 Operator::And,
432 Arc::clone(&literal),
433 )) as Arc<dyn PhysicalExpr>;
434
435 assert!(!composite_expr.is_volatile_node()); assert!(is_volatile(&composite_expr)); let stable_composite = Arc::new(BinaryExpr::new(
440 Arc::clone(&stable_expr),
441 Operator::And,
442 Arc::clone(&literal),
443 )) as Arc<dyn PhysicalExpr>;
444
445 assert!(!stable_composite.is_volatile_node());
446 assert!(!is_volatile(&stable_composite)); }
448}