datafusion_physical_expr/
physical_expr.rs1use std::sync::Arc;
19
20use crate::expressions::{self, Column};
21use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr};
22
23use arrow::compute::SortOptions;
24use arrow::datatypes::{Schema, SchemaRef};
25use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
26use datafusion_common::{plan_err, Result};
27use datafusion_common::{DFSchema, HashMap};
28use datafusion_expr::execution_props::ExecutionProps;
29use datafusion_expr::{Expr, SortExpr};
30
31use itertools::izip;
32pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
34
35pub fn add_offset_to_expr(
38 expr: Arc<dyn PhysicalExpr>,
39 offset: isize,
40) -> Result<Arc<dyn PhysicalExpr>> {
41 expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
42 Some(col) => {
43 let Some(idx) = col.index().checked_add_signed(offset) else {
44 return plan_err!("Column index overflow");
45 };
46 Ok(Transformed::yes(Arc::new(Column::new(col.name(), idx))))
47 }
48 None => Ok(Transformed::no(e)),
49 })
50 .data()
51}
52
53pub fn physical_exprs_contains(
56 physical_exprs: &[Arc<dyn PhysicalExpr>],
57 expr: &Arc<dyn PhysicalExpr>,
58) -> bool {
59 physical_exprs
60 .iter()
61 .any(|physical_expr| physical_expr.eq(expr))
62}
63
64pub fn physical_exprs_equal(
66 lhs: &[Arc<dyn PhysicalExpr>],
67 rhs: &[Arc<dyn PhysicalExpr>],
68) -> bool {
69 lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
70}
71
72pub fn physical_exprs_bag_equal(
75 lhs: &[Arc<dyn PhysicalExpr>],
76 rhs: &[Arc<dyn PhysicalExpr>],
77) -> bool {
78 let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
79 let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
80 for expr in lhs {
81 *multi_set_lhs.entry(expr).or_insert(0) += 1;
82 }
83 for expr in rhs {
84 *multi_set_rhs.entry(expr).or_insert(0) += 1;
85 }
86 multi_set_lhs == multi_set_rhs
87}
88
89pub fn create_ordering(
135 schema: &Schema,
136 sort_order: &[Vec<SortExpr>],
137) -> Result<Vec<LexOrdering>> {
138 let mut all_sort_orders = vec![];
139
140 for (group_idx, exprs) in sort_order.iter().enumerate() {
141 let mut sort_exprs = vec![];
143 for (expr_idx, sort) in exprs.iter().enumerate() {
144 match &sort.expr {
145 Expr::Column(col) => match expressions::col(&col.name, schema) {
146 Ok(expr) => {
147 let opts = SortOptions::new(!sort.asc, sort.nulls_first);
148 sort_exprs.push(PhysicalSortExpr::new(expr, opts));
149 }
150 Err(_) => break,
153 },
154 expr => {
155 return plan_err!(
156 "Expected single column reference in sort_order[{}][{}], got {}",
157 group_idx,
158 expr_idx,
159 expr
160 );
161 }
162 }
163 }
164 all_sort_orders.extend(LexOrdering::new(sort_exprs));
165 }
166 Ok(all_sort_orders)
167}
168
169pub fn create_lex_ordering(
171 schema: &SchemaRef,
172 sort_order: &[Vec<SortExpr>],
173 execution_props: &ExecutionProps,
174) -> Result<Vec<LexOrdering>> {
175 if let Ok(ordering) = create_ordering(schema, sort_order) {
178 return Ok(ordering);
179 }
180
181 let df_schema = DFSchema::try_from(Arc::clone(schema))?;
182
183 let mut all_sort_orders = vec![];
184
185 for exprs in sort_order.iter() {
186 all_sort_orders.extend(LexOrdering::new(create_physical_sort_exprs(
187 exprs,
188 &df_schema,
189 execution_props,
190 )?));
191 }
192 Ok(all_sort_orders)
193}
194
195pub fn create_physical_sort_expr(
197 e: &SortExpr,
198 input_dfschema: &DFSchema,
199 execution_props: &ExecutionProps,
200) -> Result<PhysicalSortExpr> {
201 create_physical_expr(&e.expr, input_dfschema, execution_props).map(|expr| {
202 let options = SortOptions::new(!e.asc, e.nulls_first);
203 PhysicalSortExpr::new(expr, options)
204 })
205}
206
207pub fn create_physical_sort_exprs(
209 exprs: &[SortExpr],
210 input_dfschema: &DFSchema,
211 execution_props: &ExecutionProps,
212) -> Result<Vec<PhysicalSortExpr>> {
213 exprs
214 .iter()
215 .map(|e| create_physical_sort_expr(e, input_dfschema, execution_props))
216 .collect()
217}
218
219pub fn add_offset_to_physical_sort_exprs(
220 sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
221 offset: isize,
222) -> Result<Vec<PhysicalSortExpr>> {
223 sort_exprs
224 .into_iter()
225 .map(|mut sort_expr| {
226 sort_expr.expr = add_offset_to_expr(sort_expr.expr, offset)?;
227 Ok(sort_expr)
228 })
229 .collect()
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 use crate::expressions::{BinaryExpr, Column, Literal};
237 use crate::physical_expr::{
238 physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
239 };
240 use datafusion_physical_expr_common::physical_expr::is_volatile;
241
242 use arrow::datatypes::{DataType, Schema};
243 use arrow::record_batch::RecordBatch;
244 use datafusion_common::{Result, ScalarValue};
245 use datafusion_expr::ColumnarValue;
246 use datafusion_expr::Operator;
247 use std::any::Any;
248 use std::fmt;
249
250 #[test]
251 fn test_physical_exprs_contains() {
252 let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
253 as Arc<dyn PhysicalExpr>;
254 let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
255 as Arc<dyn PhysicalExpr>;
256 let lit4 =
257 Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
258 let lit2 =
259 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
260 let lit1 =
261 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
262 let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
263 let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
264 let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
265
266 let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
268 Arc::clone(&lit_true),
269 Arc::clone(&lit_false),
270 Arc::clone(&lit4),
271 Arc::clone(&lit2),
272 Arc::clone(&col_a_expr),
273 Arc::clone(&col_b_expr),
274 ];
275 assert!(physical_exprs_contains(&physical_exprs, &lit_true));
277 assert!(physical_exprs_contains(&physical_exprs, &lit2));
278 assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
279
280 assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
282 assert!(!physical_exprs_contains(&physical_exprs, &lit1));
283 }
284
285 #[test]
286 fn test_physical_exprs_equal() {
287 let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
288 as Arc<dyn PhysicalExpr>;
289 let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
290 as Arc<dyn PhysicalExpr>;
291 let lit1 =
292 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
293 let lit2 =
294 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
295 let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
296
297 let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
298 let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
299 let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
300 let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
301
302 assert!(physical_exprs_equal(&vec1, &vec1));
304 assert!(physical_exprs_equal(&vec1, &vec4));
305 assert!(physical_exprs_bag_equal(&vec1, &vec1));
306 assert!(physical_exprs_bag_equal(&vec1, &vec4));
307
308 assert!(!physical_exprs_equal(&vec1, &vec2));
310 assert!(!physical_exprs_equal(&vec1, &vec3));
311 assert!(!physical_exprs_bag_equal(&vec1, &vec2));
312 assert!(!physical_exprs_bag_equal(&vec1, &vec3));
313 }
314
315 #[test]
316 fn test_physical_exprs_set_equal() {
317 let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
318 Arc::new(Column::new("a", 0)),
319 Arc::new(Column::new("a", 0)),
320 Arc::new(Column::new("b", 1)),
321 ];
322 let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
323 Arc::new(Column::new("b", 1)),
324 Arc::new(Column::new("b", 1)),
325 Arc::new(Column::new("a", 0)),
326 ];
327 assert!(!physical_exprs_bag_equal(
328 list1.as_slice(),
329 list2.as_slice()
330 ));
331 assert!(!physical_exprs_bag_equal(
332 list2.as_slice(),
333 list1.as_slice()
334 ));
335 assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
336 assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
337
338 let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
339 Arc::new(Column::new("a", 0)),
340 Arc::new(Column::new("b", 1)),
341 Arc::new(Column::new("c", 2)),
342 Arc::new(Column::new("a", 0)),
343 Arc::new(Column::new("b", 1)),
344 ];
345 let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
346 Arc::new(Column::new("b", 1)),
347 Arc::new(Column::new("b", 1)),
348 Arc::new(Column::new("a", 0)),
349 Arc::new(Column::new("c", 2)),
350 Arc::new(Column::new("a", 0)),
351 ];
352 assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
353 assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
354 assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
355 assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
356 assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
357 assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
358 assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
359 assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
360 }
361
362 #[test]
363 fn test_is_volatile_default_behavior() {
364 let literal =
366 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
367 let column = Arc::new(Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
368
369 assert!(!literal.is_volatile_node());
371 assert!(!column.is_volatile_node());
372
373 assert!(!is_volatile(&literal));
375 assert!(!is_volatile(&column));
376 }
377
378 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
380 struct MockVolatileExpr {
381 volatile: bool,
382 }
383
384 impl MockVolatileExpr {
385 fn new(volatile: bool) -> Self {
386 Self { volatile }
387 }
388 }
389
390 impl fmt::Display for MockVolatileExpr {
391 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
392 write!(f, "MockVolatile({})", self.volatile)
393 }
394 }
395
396 impl PhysicalExpr for MockVolatileExpr {
397 fn as_any(&self) -> &dyn Any {
398 self
399 }
400
401 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
402 Ok(DataType::Boolean)
403 }
404
405 fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
406 Ok(false)
407 }
408
409 fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
410 Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(
411 self.volatile,
412 ))))
413 }
414
415 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
416 vec![]
417 }
418
419 fn with_new_children(
420 self: Arc<Self>,
421 _children: Vec<Arc<dyn PhysicalExpr>>,
422 ) -> Result<Arc<dyn PhysicalExpr>> {
423 Ok(self)
424 }
425
426 fn is_volatile_node(&self) -> bool {
427 self.volatile
428 }
429
430 fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431 write!(f, "mock_volatile({})", self.volatile)
432 }
433 }
434
435 #[test]
436 fn test_nested_expression_volatility() {
437 let volatile_expr =
441 Arc::new(MockVolatileExpr::new(true)) as Arc<dyn PhysicalExpr>;
442 assert!(volatile_expr.is_volatile_node());
443 assert!(is_volatile(&volatile_expr));
444
445 let stable_expr = Arc::new(MockVolatileExpr::new(false)) as Arc<dyn PhysicalExpr>;
447 assert!(!stable_expr.is_volatile_node());
448 assert!(!is_volatile(&stable_expr));
449
450 let literal =
452 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
453 assert!(!literal.is_volatile_node());
454 assert!(!is_volatile(&literal));
455
456 let composite_expr = Arc::new(BinaryExpr::new(
459 Arc::clone(&volatile_expr),
460 Operator::And,
461 Arc::clone(&literal),
462 )) as Arc<dyn PhysicalExpr>;
463
464 assert!(!composite_expr.is_volatile_node()); assert!(is_volatile(&composite_expr)); let stable_composite = Arc::new(BinaryExpr::new(
469 Arc::clone(&stable_expr),
470 Operator::And,
471 Arc::clone(&literal),
472 )) as Arc<dyn PhysicalExpr>;
473
474 assert!(!stable_composite.is_volatile_node());
475 assert!(!is_volatile(&stable_composite)); }
477}