vortex_datafusion/convert/
exprs.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use arrow_schema::DataType;
7use arrow_schema::Schema;
8use datafusion_expr::Operator as DFOperator;
9use datafusion_functions::core::getfield::GetFieldFunc;
10use datafusion_physical_expr::PhysicalExpr;
11use datafusion_physical_expr::ScalarFunctionExpr;
12use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
13use datafusion_physical_plan::expressions as df_expr;
14use itertools::Itertools;
15use vortex::compute::LikeOptions;
16use vortex::dtype::DType;
17use vortex::dtype::Nullability;
18use vortex::dtype::arrow::FromArrowType;
19use vortex::error::VortexResult;
20use vortex::error::vortex_bail;
21use vortex::error::vortex_err;
22use vortex::expr::Binary;
23use vortex::expr::Expression;
24use vortex::expr::Like;
25use vortex::expr::Operator;
26use vortex::expr::VTableExt;
27use vortex::expr::and;
28use vortex::expr::cast;
29use vortex::expr::get_item;
30use vortex::expr::is_null;
31use vortex::expr::list_contains;
32use vortex::expr::lit;
33use vortex::expr::not;
34use vortex::expr::root;
35use vortex::scalar::Scalar;
36
37use crate::convert::FromDataFusion;
38
39/// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty.
40pub(crate) fn make_vortex_predicate(
41    expr_convertor: &dyn ExpressionConvertor,
42    predicate: &[Arc<dyn PhysicalExpr>],
43) -> VortexResult<Option<Expression>> {
44    let exprs = predicate
45        .iter()
46        .map(|e| expr_convertor.convert(e.as_ref()))
47        .collect::<VortexResult<Vec<_>>>()?;
48
49    Ok(exprs.into_iter().reduce(and))
50}
51
52/// Trait for converting DataFusion expressions to Vortex ones.
53pub trait ExpressionConvertor: Send + Sync {
54    /// Can an expression be pushed down given a specific schema
55    fn can_be_pushed_down(&self, expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool;
56
57    /// Try and convert a DataFusion [`PhysicalExpr`] into a Vortex [`Expression`].
58    fn convert(&self, expr: &dyn PhysicalExpr) -> VortexResult<Expression>;
59}
60
61/// The default [`ExpressionConvertor`].
62#[derive(Default)]
63pub struct DefaultExpressionConvertor {}
64
65impl DefaultExpressionConvertor {
66    /// Attempts to convert a DataFusion ScalarFunctionExpr to a Vortex expression.
67    fn try_convert_scalar_function(
68        &self,
69        scalar_fn: &ScalarFunctionExpr,
70    ) -> VortexResult<Expression> {
71        if let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn)
72        {
73            let source_expr = get_field_fn
74                .args()
75                .first()
76                .ok_or_else(|| vortex_err!("get_field missing source expression"))?
77                .as_ref();
78            let field_name_expr = get_field_fn
79                .args()
80                .get(1)
81                .ok_or_else(|| vortex_err!("get_field missing field name argument"))?;
82            let field_name = field_name_expr
83                .as_any()
84                .downcast_ref::<df_expr::Literal>()
85                .ok_or_else(|| vortex_err!("get_field field name must be a literal"))?
86                .value()
87                .try_as_str()
88                .flatten()
89                .ok_or_else(|| vortex_err!("get_field field name must be a UTF-8 string"))?;
90            return Ok(get_item(field_name.to_string(), self.convert(source_expr)?));
91        }
92
93        tracing::debug!(
94            function_name = scalar_fn.name(),
95            "Unsupported ScalarFunctionExpr"
96        );
97        vortex_bail!("Unsupported ScalarFunctionExpr: {}", scalar_fn.name())
98    }
99}
100
101impl ExpressionConvertor for DefaultExpressionConvertor {
102    fn can_be_pushed_down(&self, expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
103        can_be_pushed_down(expr, schema)
104    }
105
106    fn convert(&self, df: &dyn PhysicalExpr) -> VortexResult<Expression> {
107        // TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep
108        //  for that node, up to any `and` or `or` node.
109        if let Some(binary_expr) = df.as_any().downcast_ref::<df_expr::BinaryExpr>() {
110            let left = self.convert(binary_expr.left().as_ref())?;
111            let right = self.convert(binary_expr.right().as_ref())?;
112            let operator = try_operator_from_df(binary_expr.op())?;
113
114            return Ok(Binary.new_expr(operator, [left, right]));
115        }
116
117        if let Some(col_expr) = df.as_any().downcast_ref::<df_expr::Column>() {
118            return Ok(get_item(col_expr.name().to_owned(), root()));
119        }
120
121        if let Some(like) = df.as_any().downcast_ref::<df_expr::LikeExpr>() {
122            let child = self.convert(like.expr().as_ref())?;
123            let pattern = self.convert(like.pattern().as_ref())?;
124            return Ok(Like.new_expr(
125                LikeOptions {
126                    negated: like.negated(),
127                    case_insensitive: like.case_insensitive(),
128                },
129                [child, pattern],
130            ));
131        }
132
133        if let Some(literal) = df.as_any().downcast_ref::<df_expr::Literal>() {
134            let value = Scalar::from_df(literal.value());
135            return Ok(lit(value));
136        }
137
138        if let Some(cast_expr) = df.as_any().downcast_ref::<df_expr::CastExpr>() {
139            let cast_dtype = DType::from_arrow((cast_expr.cast_type(), Nullability::Nullable));
140            let child = self.convert(cast_expr.expr().as_ref())?;
141            return Ok(cast(child, cast_dtype));
142        }
143
144        if let Some(cast_col_expr) = df.as_any().downcast_ref::<df_expr::CastColumnExpr>() {
145            let target = cast_col_expr.target_field();
146
147            let target_dtype = DType::from_arrow((target.data_type(), target.is_nullable().into()));
148            let child = self.convert(cast_col_expr.expr().as_ref())?;
149            return Ok(cast(child, target_dtype));
150        }
151
152        if let Some(is_null_expr) = df.as_any().downcast_ref::<df_expr::IsNullExpr>() {
153            let arg = self.convert(is_null_expr.arg().as_ref())?;
154            return Ok(is_null(arg));
155        }
156
157        if let Some(is_not_null_expr) = df.as_any().downcast_ref::<df_expr::IsNotNullExpr>() {
158            let arg = self.convert(is_not_null_expr.arg().as_ref())?;
159            return Ok(not(is_null(arg)));
160        }
161
162        if let Some(in_list) = df.as_any().downcast_ref::<df_expr::InListExpr>() {
163            let value = self.convert(in_list.expr().as_ref())?;
164            let list_elements: Vec<_> = in_list
165                .list()
166                .iter()
167                .map(|e| {
168                    if let Some(lit) = e.as_any().downcast_ref::<df_expr::Literal>() {
169                        Ok(Scalar::from_df(lit.value()))
170                    } else {
171                        Err(vortex_err!("Failed to cast sub-expression"))
172                    }
173                })
174                .try_collect()?;
175
176            let list = Scalar::list(
177                list_elements[0].dtype().clone(),
178                list_elements,
179                Nullability::Nullable,
180            );
181            let expr = list_contains(lit(list), value);
182
183            return Ok(if in_list.negated() { not(expr) } else { expr });
184        }
185
186        if let Some(scalar_fn) = df.as_any().downcast_ref::<ScalarFunctionExpr>() {
187            return self.try_convert_scalar_function(scalar_fn);
188        }
189
190        vortex_bail!("Couldn't convert DataFusion physical {df} expression to a vortex expression")
191    }
192}
193
194fn try_operator_from_df(value: &DFOperator) -> VortexResult<Operator> {
195    match value {
196        DFOperator::Eq => Ok(Operator::Eq),
197        DFOperator::NotEq => Ok(Operator::NotEq),
198        DFOperator::Lt => Ok(Operator::Lt),
199        DFOperator::LtEq => Ok(Operator::Lte),
200        DFOperator::Gt => Ok(Operator::Gt),
201        DFOperator::GtEq => Ok(Operator::Gte),
202        DFOperator::And => Ok(Operator::And),
203        DFOperator::Or => Ok(Operator::Or),
204        DFOperator::Plus => Ok(Operator::Add),
205        DFOperator::Minus => Ok(Operator::Sub),
206        DFOperator::Multiply => Ok(Operator::Mul),
207        DFOperator::Divide => Ok(Operator::Div),
208        DFOperator::IsDistinctFrom
209        | DFOperator::IsNotDistinctFrom
210        | DFOperator::RegexMatch
211        | DFOperator::RegexIMatch
212        | DFOperator::RegexNotMatch
213        | DFOperator::RegexNotIMatch
214        | DFOperator::LikeMatch
215        | DFOperator::ILikeMatch
216        | DFOperator::NotLikeMatch
217        | DFOperator::NotILikeMatch
218        | DFOperator::BitwiseAnd
219        | DFOperator::BitwiseOr
220        | DFOperator::BitwiseXor
221        | DFOperator::BitwiseShiftRight
222        | DFOperator::BitwiseShiftLeft
223        | DFOperator::StringConcat
224        | DFOperator::AtArrow
225        | DFOperator::ArrowAt
226        | DFOperator::Modulo
227        | DFOperator::Arrow
228        | DFOperator::LongArrow
229        | DFOperator::HashArrow
230        | DFOperator::HashLongArrow
231        | DFOperator::AtAt
232        | DFOperator::IntegerDivide
233        | DFOperator::HashMinus
234        | DFOperator::AtQuestion
235        | DFOperator::Question
236        | DFOperator::QuestionAnd
237        | DFOperator::QuestionPipe => {
238            tracing::debug!(operator = %value, "Can't pushdown binary_operator operator");
239            Err(vortex_err!("Unsupported datafusion operator {value}"))
240        }
241    }
242}
243
244pub(crate) fn can_be_pushed_down(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
245    // We currently do not support pushdown of dynamic expressions in DF.
246    // See issue: https://github.com/vortex-data/vortex/issues/4034
247    if is_dynamic_physical_expr(df_expr) {
248        return false;
249    }
250
251    let expr = df_expr.as_any();
252    if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
253        can_binary_be_pushed_down(binary, schema)
254    } else if let Some(col) = expr.downcast_ref::<df_expr::Column>() {
255        schema
256            .field_with_name(col.name())
257            .ok()
258            .is_some_and(|field| supported_data_types(field.data_type()))
259    } else if let Some(like) = expr.downcast_ref::<df_expr::LikeExpr>() {
260        can_be_pushed_down(like.expr(), schema) && can_be_pushed_down(like.pattern(), schema)
261    } else if let Some(lit) = expr.downcast_ref::<df_expr::Literal>() {
262        supported_data_types(&lit.value().data_type())
263    } else if expr.downcast_ref::<df_expr::CastExpr>().is_some()
264        || expr.downcast_ref::<df_expr::CastColumnExpr>().is_some()
265    {
266        true
267    } else if let Some(is_null) = expr.downcast_ref::<df_expr::IsNullExpr>() {
268        can_be_pushed_down(is_null.arg(), schema)
269    } else if let Some(is_not_null) = expr.downcast_ref::<df_expr::IsNotNullExpr>() {
270        can_be_pushed_down(is_not_null.arg(), schema)
271    } else if let Some(in_list) = expr.downcast_ref::<df_expr::InListExpr>() {
272        can_be_pushed_down(in_list.expr(), schema)
273            && in_list.list().iter().all(|e| can_be_pushed_down(e, schema))
274    } else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
275        can_scalar_fn_be_pushed_down(scalar_fn)
276    } else {
277        tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
278        false
279    }
280}
281
282fn can_binary_be_pushed_down(binary: &df_expr::BinaryExpr, schema: &Schema) -> bool {
283    let is_op_supported = try_operator_from_df(binary.op()).is_ok();
284    is_op_supported
285        && can_be_pushed_down(binary.left(), schema)
286        && can_be_pushed_down(binary.right(), schema)
287}
288
289fn supported_data_types(dt: &DataType) -> bool {
290    use DataType::*;
291
292    // For dictionary types, check if the value type is supported.
293    if let Dictionary(_, value_type) = dt {
294        return supported_data_types(value_type.as_ref());
295    }
296
297    let is_supported = dt.is_null()
298        || dt.is_numeric()
299        || matches!(
300            dt,
301            Boolean
302                | Utf8
303                | LargeUtf8
304                | Utf8View
305                | Binary
306                | LargeBinary
307                | BinaryView
308                | Date32
309                | Date64
310                | Timestamp(_, _)
311                | Time32(_)
312                | Time64(_)
313        );
314
315    if !is_supported {
316        tracing::debug!("DataFusion data type {dt:?} is not supported");
317    }
318
319    is_supported
320}
321
322/// Checks if a GetField scalar function can be pushed down.
323fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr) -> bool {
324    ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn).is_some()
325}
326
327#[cfg(test)]
328mod tests {
329    use std::sync::Arc;
330
331    use arrow_schema::DataType;
332    use arrow_schema::Field;
333    use arrow_schema::Schema;
334    use arrow_schema::TimeUnit as ArrowTimeUnit;
335    use datafusion_common::ScalarValue;
336    use datafusion_expr::Operator as DFOperator;
337    use datafusion_physical_expr::PhysicalExpr;
338    use datafusion_physical_plan::expressions as df_expr;
339    use insta::assert_snapshot;
340    use rstest::rstest;
341
342    use super::*;
343
344    #[rstest::fixture]
345    fn test_schema() -> Schema {
346        Schema::new(vec![
347            Field::new("id", DataType::Int32, false),
348            Field::new("name", DataType::Utf8, true),
349            Field::new("score", DataType::Float64, true),
350            Field::new("active", DataType::Boolean, false),
351            Field::new(
352                "created_at",
353                DataType::Timestamp(ArrowTimeUnit::Millisecond, None),
354                true,
355            ),
356            Field::new(
357                "unsupported_list",
358                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
359                true,
360            ),
361        ])
362    }
363
364    #[test]
365    fn test_make_vortex_predicate_empty() {
366        let expr_convertor = DefaultExpressionConvertor::default();
367        let result = make_vortex_predicate(&expr_convertor, &[]).unwrap();
368        assert!(result.is_none());
369    }
370
371    #[test]
372    fn test_make_vortex_predicate_single() {
373        let expr_convertor = DefaultExpressionConvertor::default();
374        let col_expr = Arc::new(df_expr::Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
375        let result = make_vortex_predicate(&expr_convertor, &[col_expr]).unwrap();
376        assert!(result.is_some());
377    }
378
379    #[test]
380    fn test_make_vortex_predicate_multiple() {
381        let expr_convertor = DefaultExpressionConvertor::default();
382        let col1 = Arc::new(df_expr::Column::new("col1", 0)) as Arc<dyn PhysicalExpr>;
383        let col2 = Arc::new(df_expr::Column::new("col2", 1)) as Arc<dyn PhysicalExpr>;
384        let result = make_vortex_predicate(&expr_convertor, &[col1, col2]).unwrap();
385        assert!(result.is_some());
386        // Result should be an AND expression combining the two columns
387    }
388
389    #[rstest]
390    #[case::eq(DFOperator::Eq, Operator::Eq)]
391    #[case::not_eq(DFOperator::NotEq, Operator::NotEq)]
392    #[case::lt(DFOperator::Lt, Operator::Lt)]
393    #[case::lte(DFOperator::LtEq, Operator::Lte)]
394    #[case::gt(DFOperator::Gt, Operator::Gt)]
395    #[case::gte(DFOperator::GtEq, Operator::Gte)]
396    #[case::and(DFOperator::And, Operator::And)]
397    #[case::or(DFOperator::Or, Operator::Or)]
398    #[case::plus(DFOperator::Plus, Operator::Add)]
399    #[case::plus(DFOperator::Minus, Operator::Sub)]
400    #[case::plus(DFOperator::Multiply, Operator::Mul)]
401    #[case::plus(DFOperator::Divide, Operator::Div)]
402    fn test_operator_conversion_supported(
403        #[case] df_op: DFOperator,
404        #[case] expected_vortex_op: Operator,
405    ) {
406        let result = try_operator_from_df(&df_op).unwrap();
407        assert_eq!(result, expected_vortex_op);
408    }
409
410    #[rstest]
411    #[case::modulo(DFOperator::Modulo)]
412    #[case::bitwise_and(DFOperator::BitwiseAnd)]
413    #[case::regex_match(DFOperator::RegexMatch)]
414    #[case::like_match(DFOperator::LikeMatch)]
415    fn test_operator_conversion_unsupported(#[case] df_op: DFOperator) {
416        let result = try_operator_from_df(&df_op);
417        assert!(result.is_err());
418        assert!(
419            result
420                .unwrap_err()
421                .to_string()
422                .contains("Unsupported datafusion operator")
423        );
424    }
425
426    #[test]
427    fn test_expr_from_df_column() {
428        let col_expr = df_expr::Column::new("test_column", 0);
429        let result = DefaultExpressionConvertor::default()
430            .convert(&col_expr)
431            .unwrap();
432
433        assert_snapshot!(result.display_tree().to_string(), @r"
434        vortex.get_item(test_column)
435        └── input: vortex.root()
436        ");
437    }
438
439    #[test]
440    fn test_expr_from_df_literal() {
441        let literal_expr = df_expr::Literal::new(ScalarValue::Int32(Some(42)));
442        let result = DefaultExpressionConvertor::default()
443            .convert(&literal_expr)
444            .unwrap();
445
446        assert_snapshot!(result.display_tree().to_string(), @"vortex.literal(42i32)");
447    }
448
449    #[test]
450    fn test_expr_from_df_binary() {
451        let left = Arc::new(df_expr::Column::new("left", 0)) as Arc<dyn PhysicalExpr>;
452        let right =
453            Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
454        let binary_expr = df_expr::BinaryExpr::new(left, DFOperator::Eq, right);
455
456        let result = DefaultExpressionConvertor::default()
457            .convert(&binary_expr)
458            .unwrap();
459
460        assert_snapshot!(result.display_tree().to_string(), @r"
461        vortex.binary(=)
462        ├── lhs: vortex.get_item(left)
463        │   └── input: vortex.root()
464        └── rhs: vortex.literal(42i32)
465        ");
466    }
467
468    #[rstest]
469    #[case::like_normal(false, false)]
470    #[case::like_negated(true, false)]
471    #[case::like_case_insensitive(false, true)]
472    #[case::like_negated_case_insensitive(true, true)]
473    fn test_expr_from_df_like(#[case] negated: bool, #[case] case_insensitive: bool) {
474        let expr = Arc::new(df_expr::Column::new("text_col", 0)) as Arc<dyn PhysicalExpr>;
475        let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
476            "test%".to_string(),
477        )))) as Arc<dyn PhysicalExpr>;
478        let like_expr = df_expr::LikeExpr::new(negated, case_insensitive, expr, pattern);
479
480        let result = DefaultExpressionConvertor::default()
481            .convert(&like_expr)
482            .unwrap();
483        let like_opts = result.as_::<Like>();
484        assert_eq!(
485            like_opts,
486            &LikeOptions {
487                negated,
488                case_insensitive
489            }
490        );
491    }
492
493    #[rstest]
494    // Supported types
495    #[case::null(DataType::Null, true)]
496    #[case::boolean(DataType::Boolean, true)]
497    #[case::int8(DataType::Int8, true)]
498    #[case::int16(DataType::Int16, true)]
499    #[case::int32(DataType::Int32, true)]
500    #[case::int64(DataType::Int64, true)]
501    #[case::uint8(DataType::UInt8, true)]
502    #[case::uint16(DataType::UInt16, true)]
503    #[case::uint32(DataType::UInt32, true)]
504    #[case::uint64(DataType::UInt64, true)]
505    #[case::float32(DataType::Float32, true)]
506    #[case::float64(DataType::Float64, true)]
507    #[case::utf8(DataType::Utf8, true)]
508    #[case::utf8_view(DataType::Utf8View, true)]
509    #[case::binary(DataType::Binary, true)]
510    #[case::binary_view(DataType::BinaryView, true)]
511    #[case::date32(DataType::Date32, true)]
512    #[case::date64(DataType::Date64, true)]
513    #[case::timestamp_ms(DataType::Timestamp(ArrowTimeUnit::Millisecond, None), true)]
514    #[case::timestamp_us(
515        DataType::Timestamp(ArrowTimeUnit::Microsecond, Some(Arc::from("UTC"))),
516        true
517    )]
518    #[case::time32_s(DataType::Time32(ArrowTimeUnit::Second), true)]
519    #[case::time64_ns(DataType::Time64(ArrowTimeUnit::Nanosecond), true)]
520    // Unsupported types
521    #[case::list(
522        DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
523        false
524    )]
525    #[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()
526    ), false)]
527    // Dictionary types - should be supported if value type is supported
528    #[case::dict_utf8(
529        DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
530        true
531    )]
532    #[case::dict_int32(
533        DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Int32)),
534        true
535    )]
536    #[case::dict_unsupported(
537        DataType::Dictionary(
538            Box::new(DataType::UInt32),
539            Box::new(DataType::List(Arc::new(Field::new("item", DataType::Int32, true))))
540        ),
541        false
542    )]
543    fn test_supported_data_types(#[case] data_type: DataType, #[case] expected: bool) {
544        assert_eq!(supported_data_types(&data_type), expected);
545    }
546
547    #[rstest]
548    fn test_can_be_pushed_down_column_supported(test_schema: Schema) {
549        let col_expr = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
550
551        assert!(can_be_pushed_down(&col_expr, &test_schema));
552    }
553
554    #[rstest]
555    fn test_can_be_pushed_down_column_unsupported_type(test_schema: Schema) {
556        let col_expr =
557            Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
558
559        assert!(!can_be_pushed_down(&col_expr, &test_schema));
560    }
561
562    #[rstest]
563    fn test_can_be_pushed_down_column_not_found(test_schema: Schema) {
564        let col_expr = Arc::new(df_expr::Column::new("nonexistent", 99)) as Arc<dyn PhysicalExpr>;
565
566        assert!(!can_be_pushed_down(&col_expr, &test_schema));
567    }
568
569    #[rstest]
570    fn test_can_be_pushed_down_literal_supported(test_schema: Schema) {
571        let lit_expr =
572            Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
573
574        assert!(can_be_pushed_down(&lit_expr, &test_schema));
575    }
576
577    #[rstest]
578    fn test_can_be_pushed_down_literal_unsupported(test_schema: Schema) {
579        // Use a simpler unsupported type - Duration is not supported
580        let unsupported_literal = ScalarValue::DurationSecond(Some(42));
581        let lit_expr =
582            Arc::new(df_expr::Literal::new(unsupported_literal)) as Arc<dyn PhysicalExpr>;
583
584        assert!(!can_be_pushed_down(&lit_expr, &test_schema));
585    }
586
587    #[rstest]
588    fn test_can_be_pushed_down_binary_supported(test_schema: Schema) {
589        let left = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
590        let right =
591            Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
592        let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right))
593            as Arc<dyn PhysicalExpr>;
594
595        assert!(can_be_pushed_down(&binary_expr, &test_schema));
596    }
597
598    #[rstest]
599    fn test_can_be_pushed_down_binary_unsupported_operator(test_schema: Schema) {
600        let left = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
601        let right =
602            Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
603        let binary_expr = Arc::new(df_expr::BinaryExpr::new(
604            left,
605            DFOperator::AtQuestion,
606            right,
607        )) as Arc<dyn PhysicalExpr>;
608
609        assert!(!can_be_pushed_down(&binary_expr, &test_schema));
610    }
611
612    #[rstest]
613    fn test_can_be_pushed_down_binary_unsupported_operand(test_schema: Schema) {
614        let left = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
615        let right =
616            Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
617        let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right))
618            as Arc<dyn PhysicalExpr>;
619
620        assert!(!can_be_pushed_down(&binary_expr, &test_schema));
621    }
622
623    #[rstest]
624    fn test_can_be_pushed_down_like_supported(test_schema: Schema) {
625        let expr = Arc::new(df_expr::Column::new("name", 1)) as Arc<dyn PhysicalExpr>;
626        let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
627            "test%".to_string(),
628        )))) as Arc<dyn PhysicalExpr>;
629        let like_expr =
630            Arc::new(df_expr::LikeExpr::new(false, false, expr, pattern)) as Arc<dyn PhysicalExpr>;
631
632        assert!(can_be_pushed_down(&like_expr, &test_schema));
633    }
634
635    #[rstest]
636    fn test_can_be_pushed_down_like_unsupported_operand(test_schema: Schema) {
637        let expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
638        let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
639            "test%".to_string(),
640        )))) as Arc<dyn PhysicalExpr>;
641        let like_expr =
642            Arc::new(df_expr::LikeExpr::new(false, false, expr, pattern)) as Arc<dyn PhysicalExpr>;
643
644        assert!(!can_be_pushed_down(&like_expr, &test_schema));
645    }
646}