1use std::sync::Arc;
5
6use arrow_schema::DataType;
7use arrow_schema::Schema;
8use datafusion_expr::Operator as DFOperator;
9use datafusion_functions::core::getfield::GetFieldFunc;
10use datafusion_physical_expr::PhysicalExpr;
11use datafusion_physical_expr::ScalarFunctionExpr;
12use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
13use datafusion_physical_plan::expressions as df_expr;
14use itertools::Itertools;
15use vortex::compute::LikeOptions;
16use vortex::dtype::DType;
17use vortex::dtype::Nullability;
18use vortex::dtype::arrow::FromArrowType;
19use vortex::error::VortexResult;
20use vortex::error::vortex_bail;
21use vortex::error::vortex_err;
22use vortex::expr::Binary;
23use vortex::expr::Expression;
24use vortex::expr::Like;
25use vortex::expr::Operator;
26use vortex::expr::VTableExt;
27use vortex::expr::and;
28use vortex::expr::cast;
29use vortex::expr::get_item;
30use vortex::expr::is_null;
31use vortex::expr::list_contains;
32use vortex::expr::lit;
33use vortex::expr::not;
34use vortex::expr::root;
35use vortex::scalar::Scalar;
36
37use crate::convert::FromDataFusion;
38
39pub(crate) fn make_vortex_predicate(
41 expr_convertor: &dyn ExpressionConvertor,
42 predicate: &[Arc<dyn PhysicalExpr>],
43) -> VortexResult<Option<Expression>> {
44 let exprs = predicate
45 .iter()
46 .map(|e| expr_convertor.convert(e.as_ref()))
47 .collect::<VortexResult<Vec<_>>>()?;
48
49 Ok(exprs.into_iter().reduce(and))
50}
51
52pub trait ExpressionConvertor: Send + Sync {
54 fn can_be_pushed_down(&self, expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool;
56
57 fn convert(&self, expr: &dyn PhysicalExpr) -> VortexResult<Expression>;
59}
60
61#[derive(Default)]
63pub struct DefaultExpressionConvertor {}
64
65impl DefaultExpressionConvertor {
66 fn try_convert_scalar_function(
68 &self,
69 scalar_fn: &ScalarFunctionExpr,
70 ) -> VortexResult<Expression> {
71 if let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn)
72 {
73 let source_expr = get_field_fn
74 .args()
75 .first()
76 .ok_or_else(|| vortex_err!("get_field missing source expression"))?
77 .as_ref();
78 let field_name_expr = get_field_fn
79 .args()
80 .get(1)
81 .ok_or_else(|| vortex_err!("get_field missing field name argument"))?;
82 let field_name = field_name_expr
83 .as_any()
84 .downcast_ref::<df_expr::Literal>()
85 .ok_or_else(|| vortex_err!("get_field field name must be a literal"))?
86 .value()
87 .try_as_str()
88 .flatten()
89 .ok_or_else(|| vortex_err!("get_field field name must be a UTF-8 string"))?;
90 return Ok(get_item(field_name.to_string(), self.convert(source_expr)?));
91 }
92
93 tracing::debug!(
94 function_name = scalar_fn.name(),
95 "Unsupported ScalarFunctionExpr"
96 );
97 vortex_bail!("Unsupported ScalarFunctionExpr: {}", scalar_fn.name())
98 }
99}
100
101impl ExpressionConvertor for DefaultExpressionConvertor {
102 fn can_be_pushed_down(&self, expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
103 can_be_pushed_down(expr, schema)
104 }
105
106 fn convert(&self, df: &dyn PhysicalExpr) -> VortexResult<Expression> {
107 if let Some(binary_expr) = df.as_any().downcast_ref::<df_expr::BinaryExpr>() {
110 let left = self.convert(binary_expr.left().as_ref())?;
111 let right = self.convert(binary_expr.right().as_ref())?;
112 let operator = try_operator_from_df(binary_expr.op())?;
113
114 return Ok(Binary.new_expr(operator, [left, right]));
115 }
116
117 if let Some(col_expr) = df.as_any().downcast_ref::<df_expr::Column>() {
118 return Ok(get_item(col_expr.name().to_owned(), root()));
119 }
120
121 if let Some(like) = df.as_any().downcast_ref::<df_expr::LikeExpr>() {
122 let child = self.convert(like.expr().as_ref())?;
123 let pattern = self.convert(like.pattern().as_ref())?;
124 return Ok(Like.new_expr(
125 LikeOptions {
126 negated: like.negated(),
127 case_insensitive: like.case_insensitive(),
128 },
129 [child, pattern],
130 ));
131 }
132
133 if let Some(literal) = df.as_any().downcast_ref::<df_expr::Literal>() {
134 let value = Scalar::from_df(literal.value());
135 return Ok(lit(value));
136 }
137
138 if let Some(cast_expr) = df.as_any().downcast_ref::<df_expr::CastExpr>() {
139 let cast_dtype = DType::from_arrow((cast_expr.cast_type(), Nullability::Nullable));
140 let child = self.convert(cast_expr.expr().as_ref())?;
141 return Ok(cast(child, cast_dtype));
142 }
143
144 if let Some(cast_col_expr) = df.as_any().downcast_ref::<df_expr::CastColumnExpr>() {
145 let target = cast_col_expr.target_field();
146
147 let target_dtype = DType::from_arrow((target.data_type(), target.is_nullable().into()));
148 let child = self.convert(cast_col_expr.expr().as_ref())?;
149 return Ok(cast(child, target_dtype));
150 }
151
152 if let Some(is_null_expr) = df.as_any().downcast_ref::<df_expr::IsNullExpr>() {
153 let arg = self.convert(is_null_expr.arg().as_ref())?;
154 return Ok(is_null(arg));
155 }
156
157 if let Some(is_not_null_expr) = df.as_any().downcast_ref::<df_expr::IsNotNullExpr>() {
158 let arg = self.convert(is_not_null_expr.arg().as_ref())?;
159 return Ok(not(is_null(arg)));
160 }
161
162 if let Some(in_list) = df.as_any().downcast_ref::<df_expr::InListExpr>() {
163 let value = self.convert(in_list.expr().as_ref())?;
164 let list_elements: Vec<_> = in_list
165 .list()
166 .iter()
167 .map(|e| {
168 if let Some(lit) = e.as_any().downcast_ref::<df_expr::Literal>() {
169 Ok(Scalar::from_df(lit.value()))
170 } else {
171 Err(vortex_err!("Failed to cast sub-expression"))
172 }
173 })
174 .try_collect()?;
175
176 let list = Scalar::list(
177 list_elements[0].dtype().clone(),
178 list_elements,
179 Nullability::Nullable,
180 );
181 let expr = list_contains(lit(list), value);
182
183 return Ok(if in_list.negated() { not(expr) } else { expr });
184 }
185
186 if let Some(scalar_fn) = df.as_any().downcast_ref::<ScalarFunctionExpr>() {
187 return self.try_convert_scalar_function(scalar_fn);
188 }
189
190 vortex_bail!("Couldn't convert DataFusion physical {df} expression to a vortex expression")
191 }
192}
193
194fn try_operator_from_df(value: &DFOperator) -> VortexResult<Operator> {
195 match value {
196 DFOperator::Eq => Ok(Operator::Eq),
197 DFOperator::NotEq => Ok(Operator::NotEq),
198 DFOperator::Lt => Ok(Operator::Lt),
199 DFOperator::LtEq => Ok(Operator::Lte),
200 DFOperator::Gt => Ok(Operator::Gt),
201 DFOperator::GtEq => Ok(Operator::Gte),
202 DFOperator::And => Ok(Operator::And),
203 DFOperator::Or => Ok(Operator::Or),
204 DFOperator::Plus => Ok(Operator::Add),
205 DFOperator::Minus => Ok(Operator::Sub),
206 DFOperator::Multiply => Ok(Operator::Mul),
207 DFOperator::Divide => Ok(Operator::Div),
208 DFOperator::IsDistinctFrom
209 | DFOperator::IsNotDistinctFrom
210 | DFOperator::RegexMatch
211 | DFOperator::RegexIMatch
212 | DFOperator::RegexNotMatch
213 | DFOperator::RegexNotIMatch
214 | DFOperator::LikeMatch
215 | DFOperator::ILikeMatch
216 | DFOperator::NotLikeMatch
217 | DFOperator::NotILikeMatch
218 | DFOperator::BitwiseAnd
219 | DFOperator::BitwiseOr
220 | DFOperator::BitwiseXor
221 | DFOperator::BitwiseShiftRight
222 | DFOperator::BitwiseShiftLeft
223 | DFOperator::StringConcat
224 | DFOperator::AtArrow
225 | DFOperator::ArrowAt
226 | DFOperator::Modulo
227 | DFOperator::Arrow
228 | DFOperator::LongArrow
229 | DFOperator::HashArrow
230 | DFOperator::HashLongArrow
231 | DFOperator::AtAt
232 | DFOperator::IntegerDivide
233 | DFOperator::HashMinus
234 | DFOperator::AtQuestion
235 | DFOperator::Question
236 | DFOperator::QuestionAnd
237 | DFOperator::QuestionPipe => {
238 tracing::debug!(operator = %value, "Can't pushdown binary_operator operator");
239 Err(vortex_err!("Unsupported datafusion operator {value}"))
240 }
241 }
242}
243
244pub(crate) fn can_be_pushed_down(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
245 if is_dynamic_physical_expr(df_expr) {
248 return false;
249 }
250
251 let expr = df_expr.as_any();
252 if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
253 can_binary_be_pushed_down(binary, schema)
254 } else if let Some(col) = expr.downcast_ref::<df_expr::Column>() {
255 schema
256 .field_with_name(col.name())
257 .ok()
258 .is_some_and(|field| supported_data_types(field.data_type()))
259 } else if let Some(like) = expr.downcast_ref::<df_expr::LikeExpr>() {
260 can_be_pushed_down(like.expr(), schema) && can_be_pushed_down(like.pattern(), schema)
261 } else if let Some(lit) = expr.downcast_ref::<df_expr::Literal>() {
262 supported_data_types(&lit.value().data_type())
263 } else if expr.downcast_ref::<df_expr::CastExpr>().is_some()
264 || expr.downcast_ref::<df_expr::CastColumnExpr>().is_some()
265 {
266 true
267 } else if let Some(is_null) = expr.downcast_ref::<df_expr::IsNullExpr>() {
268 can_be_pushed_down(is_null.arg(), schema)
269 } else if let Some(is_not_null) = expr.downcast_ref::<df_expr::IsNotNullExpr>() {
270 can_be_pushed_down(is_not_null.arg(), schema)
271 } else if let Some(in_list) = expr.downcast_ref::<df_expr::InListExpr>() {
272 can_be_pushed_down(in_list.expr(), schema)
273 && in_list.list().iter().all(|e| can_be_pushed_down(e, schema))
274 } else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
275 can_scalar_fn_be_pushed_down(scalar_fn)
276 } else {
277 tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
278 false
279 }
280}
281
282fn can_binary_be_pushed_down(binary: &df_expr::BinaryExpr, schema: &Schema) -> bool {
283 let is_op_supported = try_operator_from_df(binary.op()).is_ok();
284 is_op_supported
285 && can_be_pushed_down(binary.left(), schema)
286 && can_be_pushed_down(binary.right(), schema)
287}
288
289fn supported_data_types(dt: &DataType) -> bool {
290 use DataType::*;
291
292 if let Dictionary(_, value_type) = dt {
294 return supported_data_types(value_type.as_ref());
295 }
296
297 let is_supported = dt.is_null()
298 || dt.is_numeric()
299 || matches!(
300 dt,
301 Boolean
302 | Utf8
303 | LargeUtf8
304 | Utf8View
305 | Binary
306 | LargeBinary
307 | BinaryView
308 | Date32
309 | Date64
310 | Timestamp(_, _)
311 | Time32(_)
312 | Time64(_)
313 );
314
315 if !is_supported {
316 tracing::debug!("DataFusion data type {dt:?} is not supported");
317 }
318
319 is_supported
320}
321
322fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr) -> bool {
324 ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn).is_some()
325}
326
327#[cfg(test)]
328mod tests {
329 use std::sync::Arc;
330
331 use arrow_schema::DataType;
332 use arrow_schema::Field;
333 use arrow_schema::Schema;
334 use arrow_schema::TimeUnit as ArrowTimeUnit;
335 use datafusion_common::ScalarValue;
336 use datafusion_expr::Operator as DFOperator;
337 use datafusion_physical_expr::PhysicalExpr;
338 use datafusion_physical_plan::expressions as df_expr;
339 use insta::assert_snapshot;
340 use rstest::rstest;
341
342 use super::*;
343
344 #[rstest::fixture]
345 fn test_schema() -> Schema {
346 Schema::new(vec![
347 Field::new("id", DataType::Int32, false),
348 Field::new("name", DataType::Utf8, true),
349 Field::new("score", DataType::Float64, true),
350 Field::new("active", DataType::Boolean, false),
351 Field::new(
352 "created_at",
353 DataType::Timestamp(ArrowTimeUnit::Millisecond, None),
354 true,
355 ),
356 Field::new(
357 "unsupported_list",
358 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
359 true,
360 ),
361 ])
362 }
363
364 #[test]
365 fn test_make_vortex_predicate_empty() {
366 let expr_convertor = DefaultExpressionConvertor::default();
367 let result = make_vortex_predicate(&expr_convertor, &[]).unwrap();
368 assert!(result.is_none());
369 }
370
371 #[test]
372 fn test_make_vortex_predicate_single() {
373 let expr_convertor = DefaultExpressionConvertor::default();
374 let col_expr = Arc::new(df_expr::Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
375 let result = make_vortex_predicate(&expr_convertor, &[col_expr]).unwrap();
376 assert!(result.is_some());
377 }
378
379 #[test]
380 fn test_make_vortex_predicate_multiple() {
381 let expr_convertor = DefaultExpressionConvertor::default();
382 let col1 = Arc::new(df_expr::Column::new("col1", 0)) as Arc<dyn PhysicalExpr>;
383 let col2 = Arc::new(df_expr::Column::new("col2", 1)) as Arc<dyn PhysicalExpr>;
384 let result = make_vortex_predicate(&expr_convertor, &[col1, col2]).unwrap();
385 assert!(result.is_some());
386 }
388
389 #[rstest]
390 #[case::eq(DFOperator::Eq, Operator::Eq)]
391 #[case::not_eq(DFOperator::NotEq, Operator::NotEq)]
392 #[case::lt(DFOperator::Lt, Operator::Lt)]
393 #[case::lte(DFOperator::LtEq, Operator::Lte)]
394 #[case::gt(DFOperator::Gt, Operator::Gt)]
395 #[case::gte(DFOperator::GtEq, Operator::Gte)]
396 #[case::and(DFOperator::And, Operator::And)]
397 #[case::or(DFOperator::Or, Operator::Or)]
398 #[case::plus(DFOperator::Plus, Operator::Add)]
399 #[case::plus(DFOperator::Minus, Operator::Sub)]
400 #[case::plus(DFOperator::Multiply, Operator::Mul)]
401 #[case::plus(DFOperator::Divide, Operator::Div)]
402 fn test_operator_conversion_supported(
403 #[case] df_op: DFOperator,
404 #[case] expected_vortex_op: Operator,
405 ) {
406 let result = try_operator_from_df(&df_op).unwrap();
407 assert_eq!(result, expected_vortex_op);
408 }
409
410 #[rstest]
411 #[case::modulo(DFOperator::Modulo)]
412 #[case::bitwise_and(DFOperator::BitwiseAnd)]
413 #[case::regex_match(DFOperator::RegexMatch)]
414 #[case::like_match(DFOperator::LikeMatch)]
415 fn test_operator_conversion_unsupported(#[case] df_op: DFOperator) {
416 let result = try_operator_from_df(&df_op);
417 assert!(result.is_err());
418 assert!(
419 result
420 .unwrap_err()
421 .to_string()
422 .contains("Unsupported datafusion operator")
423 );
424 }
425
426 #[test]
427 fn test_expr_from_df_column() {
428 let col_expr = df_expr::Column::new("test_column", 0);
429 let result = DefaultExpressionConvertor::default()
430 .convert(&col_expr)
431 .unwrap();
432
433 assert_snapshot!(result.display_tree().to_string(), @r"
434 vortex.get_item(test_column)
435 └── input: vortex.root()
436 ");
437 }
438
439 #[test]
440 fn test_expr_from_df_literal() {
441 let literal_expr = df_expr::Literal::new(ScalarValue::Int32(Some(42)));
442 let result = DefaultExpressionConvertor::default()
443 .convert(&literal_expr)
444 .unwrap();
445
446 assert_snapshot!(result.display_tree().to_string(), @"vortex.literal(42i32)");
447 }
448
449 #[test]
450 fn test_expr_from_df_binary() {
451 let left = Arc::new(df_expr::Column::new("left", 0)) as Arc<dyn PhysicalExpr>;
452 let right =
453 Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
454 let binary_expr = df_expr::BinaryExpr::new(left, DFOperator::Eq, right);
455
456 let result = DefaultExpressionConvertor::default()
457 .convert(&binary_expr)
458 .unwrap();
459
460 assert_snapshot!(result.display_tree().to_string(), @r"
461 vortex.binary(=)
462 ├── lhs: vortex.get_item(left)
463 │ └── input: vortex.root()
464 └── rhs: vortex.literal(42i32)
465 ");
466 }
467
468 #[rstest]
469 #[case::like_normal(false, false)]
470 #[case::like_negated(true, false)]
471 #[case::like_case_insensitive(false, true)]
472 #[case::like_negated_case_insensitive(true, true)]
473 fn test_expr_from_df_like(#[case] negated: bool, #[case] case_insensitive: bool) {
474 let expr = Arc::new(df_expr::Column::new("text_col", 0)) as Arc<dyn PhysicalExpr>;
475 let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
476 "test%".to_string(),
477 )))) as Arc<dyn PhysicalExpr>;
478 let like_expr = df_expr::LikeExpr::new(negated, case_insensitive, expr, pattern);
479
480 let result = DefaultExpressionConvertor::default()
481 .convert(&like_expr)
482 .unwrap();
483 let like_opts = result.as_::<Like>();
484 assert_eq!(
485 like_opts,
486 &LikeOptions {
487 negated,
488 case_insensitive
489 }
490 );
491 }
492
493 #[rstest]
494 #[case::null(DataType::Null, true)]
496 #[case::boolean(DataType::Boolean, true)]
497 #[case::int8(DataType::Int8, true)]
498 #[case::int16(DataType::Int16, true)]
499 #[case::int32(DataType::Int32, true)]
500 #[case::int64(DataType::Int64, true)]
501 #[case::uint8(DataType::UInt8, true)]
502 #[case::uint16(DataType::UInt16, true)]
503 #[case::uint32(DataType::UInt32, true)]
504 #[case::uint64(DataType::UInt64, true)]
505 #[case::float32(DataType::Float32, true)]
506 #[case::float64(DataType::Float64, true)]
507 #[case::utf8(DataType::Utf8, true)]
508 #[case::utf8_view(DataType::Utf8View, true)]
509 #[case::binary(DataType::Binary, true)]
510 #[case::binary_view(DataType::BinaryView, true)]
511 #[case::date32(DataType::Date32, true)]
512 #[case::date64(DataType::Date64, true)]
513 #[case::timestamp_ms(DataType::Timestamp(ArrowTimeUnit::Millisecond, None), true)]
514 #[case::timestamp_us(
515 DataType::Timestamp(ArrowTimeUnit::Microsecond, Some(Arc::from("UTC"))),
516 true
517 )]
518 #[case::time32_s(DataType::Time32(ArrowTimeUnit::Second), true)]
519 #[case::time64_ns(DataType::Time64(ArrowTimeUnit::Nanosecond), true)]
520 #[case::list(
522 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
523 false
524 )]
525 #[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()
526 ), false)]
527 #[case::dict_utf8(
529 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
530 true
531 )]
532 #[case::dict_int32(
533 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Int32)),
534 true
535 )]
536 #[case::dict_unsupported(
537 DataType::Dictionary(
538 Box::new(DataType::UInt32),
539 Box::new(DataType::List(Arc::new(Field::new("item", DataType::Int32, true))))
540 ),
541 false
542 )]
543 fn test_supported_data_types(#[case] data_type: DataType, #[case] expected: bool) {
544 assert_eq!(supported_data_types(&data_type), expected);
545 }
546
547 #[rstest]
548 fn test_can_be_pushed_down_column_supported(test_schema: Schema) {
549 let col_expr = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
550
551 assert!(can_be_pushed_down(&col_expr, &test_schema));
552 }
553
554 #[rstest]
555 fn test_can_be_pushed_down_column_unsupported_type(test_schema: Schema) {
556 let col_expr =
557 Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
558
559 assert!(!can_be_pushed_down(&col_expr, &test_schema));
560 }
561
562 #[rstest]
563 fn test_can_be_pushed_down_column_not_found(test_schema: Schema) {
564 let col_expr = Arc::new(df_expr::Column::new("nonexistent", 99)) as Arc<dyn PhysicalExpr>;
565
566 assert!(!can_be_pushed_down(&col_expr, &test_schema));
567 }
568
569 #[rstest]
570 fn test_can_be_pushed_down_literal_supported(test_schema: Schema) {
571 let lit_expr =
572 Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
573
574 assert!(can_be_pushed_down(&lit_expr, &test_schema));
575 }
576
577 #[rstest]
578 fn test_can_be_pushed_down_literal_unsupported(test_schema: Schema) {
579 let unsupported_literal = ScalarValue::DurationSecond(Some(42));
581 let lit_expr =
582 Arc::new(df_expr::Literal::new(unsupported_literal)) as Arc<dyn PhysicalExpr>;
583
584 assert!(!can_be_pushed_down(&lit_expr, &test_schema));
585 }
586
587 #[rstest]
588 fn test_can_be_pushed_down_binary_supported(test_schema: Schema) {
589 let left = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
590 let right =
591 Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
592 let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right))
593 as Arc<dyn PhysicalExpr>;
594
595 assert!(can_be_pushed_down(&binary_expr, &test_schema));
596 }
597
598 #[rstest]
599 fn test_can_be_pushed_down_binary_unsupported_operator(test_schema: Schema) {
600 let left = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
601 let right =
602 Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
603 let binary_expr = Arc::new(df_expr::BinaryExpr::new(
604 left,
605 DFOperator::AtQuestion,
606 right,
607 )) as Arc<dyn PhysicalExpr>;
608
609 assert!(!can_be_pushed_down(&binary_expr, &test_schema));
610 }
611
612 #[rstest]
613 fn test_can_be_pushed_down_binary_unsupported_operand(test_schema: Schema) {
614 let left = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
615 let right =
616 Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
617 let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right))
618 as Arc<dyn PhysicalExpr>;
619
620 assert!(!can_be_pushed_down(&binary_expr, &test_schema));
621 }
622
623 #[rstest]
624 fn test_can_be_pushed_down_like_supported(test_schema: Schema) {
625 let expr = Arc::new(df_expr::Column::new("name", 1)) as Arc<dyn PhysicalExpr>;
626 let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
627 "test%".to_string(),
628 )))) as Arc<dyn PhysicalExpr>;
629 let like_expr =
630 Arc::new(df_expr::LikeExpr::new(false, false, expr, pattern)) as Arc<dyn PhysicalExpr>;
631
632 assert!(can_be_pushed_down(&like_expr, &test_schema));
633 }
634
635 #[rstest]
636 fn test_can_be_pushed_down_like_unsupported_operand(test_schema: Schema) {
637 let expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
638 let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
639 "test%".to_string(),
640 )))) as Arc<dyn PhysicalExpr>;
641 let like_expr =
642 Arc::new(df_expr::LikeExpr::new(false, false, expr, pattern)) as Arc<dyn PhysicalExpr>;
643
644 assert!(!can_be_pushed_down(&like_expr, &test_schema));
645 }
646}