datafusion_physical_expr/
physical_expr.rs1use std::sync::Arc;
19
20use crate::expressions::{self, Column};
21use crate::{LexOrdering, PhysicalSortExpr, create_physical_expr};
22
23use arrow::compute::SortOptions;
24use arrow::datatypes::{Schema, SchemaRef};
25use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
26use datafusion_common::{DFSchema, HashMap};
27use datafusion_common::{Result, plan_err};
28use datafusion_expr::execution_props::ExecutionProps;
29use datafusion_expr::{Expr, SortExpr};
30
31use itertools::izip;
32pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
34
35pub fn add_offset_to_expr(
38 expr: Arc<dyn PhysicalExpr>,
39 offset: isize,
40) -> Result<Arc<dyn PhysicalExpr>> {
41 expr.transform_down(|e| match e.downcast_ref::<Column>() {
42 Some(col) => {
43 let Some(idx) = col.index().checked_add_signed(offset) else {
44 return plan_err!("Column index overflow");
45 };
46 Ok(Transformed::yes(Arc::new(Column::new(col.name(), idx))))
47 }
48 None => Ok(Transformed::no(e)),
49 })
50 .data()
51}
52
53pub fn physical_exprs_contains(
56 physical_exprs: &[Arc<dyn PhysicalExpr>],
57 expr: &Arc<dyn PhysicalExpr>,
58) -> bool {
59 physical_exprs
60 .iter()
61 .any(|physical_expr| physical_expr.eq(expr))
62}
63
64pub fn physical_exprs_equal(
66 lhs: &[Arc<dyn PhysicalExpr>],
67 rhs: &[Arc<dyn PhysicalExpr>],
68) -> bool {
69 lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
70}
71
72pub fn physical_exprs_bag_equal(
75 lhs: &[Arc<dyn PhysicalExpr>],
76 rhs: &[Arc<dyn PhysicalExpr>],
77) -> bool {
78 let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
79 let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
80 for expr in lhs {
81 *multi_set_lhs.entry(expr).or_insert(0) += 1;
82 }
83 for expr in rhs {
84 *multi_set_rhs.entry(expr).or_insert(0) += 1;
85 }
86 multi_set_lhs == multi_set_rhs
87}
88
89pub fn create_ordering(
135 schema: &Schema,
136 sort_order: &[Vec<SortExpr>],
137) -> Result<Vec<LexOrdering>> {
138 let mut all_sort_orders = vec![];
139
140 for (group_idx, exprs) in sort_order.iter().enumerate() {
141 let mut sort_exprs = vec![];
143 for (expr_idx, sort) in exprs.iter().enumerate() {
144 match &sort.expr {
145 Expr::Column(col) => match expressions::col(&col.name, schema) {
146 Ok(expr) => {
147 let opts = SortOptions::new(!sort.asc, sort.nulls_first);
148 sort_exprs.push(PhysicalSortExpr::new(expr, opts));
149 }
150 Err(_) => break,
153 },
154 expr => {
155 return plan_err!(
156 "Expected single column reference in sort_order[{}][{}], got {}",
157 group_idx,
158 expr_idx,
159 expr
160 );
161 }
162 }
163 }
164 all_sort_orders.extend(LexOrdering::new(sort_exprs));
165 }
166 Ok(all_sort_orders)
167}
168
169pub fn create_lex_ordering(
171 schema: &SchemaRef,
172 sort_order: &[Vec<SortExpr>],
173 execution_props: &ExecutionProps,
174) -> Result<Vec<LexOrdering>> {
175 if let Ok(ordering) = create_ordering(schema, sort_order) {
178 return Ok(ordering);
179 }
180
181 let df_schema = DFSchema::try_from(Arc::clone(schema))?;
182
183 let mut all_sort_orders = vec![];
184
185 for exprs in sort_order.iter() {
186 all_sort_orders.extend(LexOrdering::new(create_physical_sort_exprs(
187 exprs,
188 &df_schema,
189 execution_props,
190 )?));
191 }
192 Ok(all_sort_orders)
193}
194
195pub fn create_physical_sort_expr(
197 e: &SortExpr,
198 input_dfschema: &DFSchema,
199 execution_props: &ExecutionProps,
200) -> Result<PhysicalSortExpr> {
201 create_physical_expr(&e.expr, input_dfschema, execution_props).map(|expr| {
202 let options = SortOptions::new(!e.asc, e.nulls_first);
203 PhysicalSortExpr::new(expr, options)
204 })
205}
206
207pub fn create_physical_sort_exprs(
209 exprs: &[SortExpr],
210 input_dfschema: &DFSchema,
211 execution_props: &ExecutionProps,
212) -> Result<Vec<PhysicalSortExpr>> {
213 exprs
214 .iter()
215 .map(|e| create_physical_sort_expr(e, input_dfschema, execution_props))
216 .collect()
217}
218
219pub fn add_offset_to_physical_sort_exprs(
220 sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
221 offset: isize,
222) -> Result<Vec<PhysicalSortExpr>> {
223 sort_exprs
224 .into_iter()
225 .map(|mut sort_expr| {
226 sort_expr.expr = add_offset_to_expr(sort_expr.expr, offset)?;
227 Ok(sort_expr)
228 })
229 .collect()
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 use crate::expressions::{BinaryExpr, Literal};
237 use crate::physical_expr::{
238 physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
239 };
240 use datafusion_physical_expr_common::physical_expr::is_volatile;
241
242 use arrow::datatypes::DataType;
243 use arrow::record_batch::RecordBatch;
244 use datafusion_common::ScalarValue;
245 use datafusion_expr::ColumnarValue;
246 use datafusion_expr::Operator;
247 use std::fmt;
248
249 #[test]
250 fn test_physical_exprs_contains() {
251 let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
252 as Arc<dyn PhysicalExpr>;
253 let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
254 as Arc<dyn PhysicalExpr>;
255 let lit4 =
256 Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
257 let lit2 =
258 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
259 let lit1 =
260 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
261 let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
262 let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
263 let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
264
265 let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
267 Arc::clone(&lit_true),
268 Arc::clone(&lit_false),
269 Arc::clone(&lit4),
270 Arc::clone(&lit2),
271 Arc::clone(&col_a_expr),
272 Arc::clone(&col_b_expr),
273 ];
274 assert!(physical_exprs_contains(&physical_exprs, &lit_true));
276 assert!(physical_exprs_contains(&physical_exprs, &lit2));
277 assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
278
279 assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
281 assert!(!physical_exprs_contains(&physical_exprs, &lit1));
282 }
283
284 #[test]
285 fn test_physical_exprs_equal() {
286 let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
287 as Arc<dyn PhysicalExpr>;
288 let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
289 as Arc<dyn PhysicalExpr>;
290 let lit1 =
291 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
292 let lit2 =
293 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
294 let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
295
296 let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
297 let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
298 let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
299 let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
300
301 assert!(physical_exprs_equal(&vec1, &vec1));
303 assert!(physical_exprs_equal(&vec1, &vec4));
304 assert!(physical_exprs_bag_equal(&vec1, &vec1));
305 assert!(physical_exprs_bag_equal(&vec1, &vec4));
306
307 assert!(!physical_exprs_equal(&vec1, &vec2));
309 assert!(!physical_exprs_equal(&vec1, &vec3));
310 assert!(!physical_exprs_bag_equal(&vec1, &vec2));
311 assert!(!physical_exprs_bag_equal(&vec1, &vec3));
312 }
313
314 #[test]
315 fn test_physical_exprs_set_equal() {
316 let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
317 Arc::new(Column::new("a", 0)),
318 Arc::new(Column::new("a", 0)),
319 Arc::new(Column::new("b", 1)),
320 ];
321 let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
322 Arc::new(Column::new("b", 1)),
323 Arc::new(Column::new("b", 1)),
324 Arc::new(Column::new("a", 0)),
325 ];
326 assert!(!physical_exprs_bag_equal(
327 list1.as_slice(),
328 list2.as_slice()
329 ));
330 assert!(!physical_exprs_bag_equal(
331 list2.as_slice(),
332 list1.as_slice()
333 ));
334 assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
335 assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
336
337 let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
338 Arc::new(Column::new("a", 0)),
339 Arc::new(Column::new("b", 1)),
340 Arc::new(Column::new("c", 2)),
341 Arc::new(Column::new("a", 0)),
342 Arc::new(Column::new("b", 1)),
343 ];
344 let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
345 Arc::new(Column::new("b", 1)),
346 Arc::new(Column::new("b", 1)),
347 Arc::new(Column::new("a", 0)),
348 Arc::new(Column::new("c", 2)),
349 Arc::new(Column::new("a", 0)),
350 ];
351 assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
352 assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
353 assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
354 assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
355 assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
356 assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
357 assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
358 assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
359 }
360
361 #[test]
362 fn test_is_volatile_default_behavior() {
363 let literal =
365 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
366 let column = Arc::new(Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
367
368 assert!(!literal.is_volatile_node());
370 assert!(!column.is_volatile_node());
371
372 assert!(!is_volatile(&literal));
374 assert!(!is_volatile(&column));
375 }
376
377 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
379 struct MockVolatileExpr {
380 volatile: bool,
381 }
382
383 impl MockVolatileExpr {
384 fn new(volatile: bool) -> Self {
385 Self { volatile }
386 }
387 }
388
389 impl fmt::Display for MockVolatileExpr {
390 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
391 write!(f, "MockVolatile({})", self.volatile)
392 }
393 }
394
395 impl PhysicalExpr for MockVolatileExpr {
396 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
397 Ok(DataType::Boolean)
398 }
399
400 fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
401 Ok(false)
402 }
403
404 fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
405 Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(
406 self.volatile,
407 ))))
408 }
409
410 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
411 vec![]
412 }
413
414 fn with_new_children(
415 self: Arc<Self>,
416 _children: Vec<Arc<dyn PhysicalExpr>>,
417 ) -> Result<Arc<dyn PhysicalExpr>> {
418 Ok(self)
419 }
420
421 fn is_volatile_node(&self) -> bool {
422 self.volatile
423 }
424
425 fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
426 write!(f, "mock_volatile({})", self.volatile)
427 }
428 }
429
430 #[test]
431 fn test_nested_expression_volatility() {
432 let volatile_expr =
436 Arc::new(MockVolatileExpr::new(true)) as Arc<dyn PhysicalExpr>;
437 assert!(volatile_expr.is_volatile_node());
438 assert!(is_volatile(&volatile_expr));
439
440 let stable_expr = Arc::new(MockVolatileExpr::new(false)) as Arc<dyn PhysicalExpr>;
442 assert!(!stable_expr.is_volatile_node());
443 assert!(!is_volatile(&stable_expr));
444
445 let literal =
447 Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
448 assert!(!literal.is_volatile_node());
449 assert!(!is_volatile(&literal));
450
451 let composite_expr = Arc::new(BinaryExpr::new(
454 Arc::clone(&volatile_expr),
455 Operator::And,
456 Arc::clone(&literal),
457 )) as Arc<dyn PhysicalExpr>;
458
459 assert!(!composite_expr.is_volatile_node()); assert!(is_volatile(&composite_expr)); let stable_composite = Arc::new(BinaryExpr::new(
464 Arc::clone(&stable_expr),
465 Operator::And,
466 Arc::clone(&literal),
467 )) as Arc<dyn PhysicalExpr>;
468
469 assert!(!stable_composite.is_volatile_node());
470 assert!(!is_volatile(&stable_composite)); }
472}