Skip to main content

vortex_datafusion/convert/
exprs.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use arrow_schema::DataType;
7use arrow_schema::Schema;
8use datafusion_common::Result as DFResult;
9use datafusion_common::exec_datafusion_err;
10use datafusion_common::tree_node::TreeNode;
11use datafusion_common::tree_node::TreeNodeRecursion;
12use datafusion_expr::Operator as DFOperator;
13use datafusion_functions::core::getfield::GetFieldFunc;
14use datafusion_physical_expr::PhysicalExpr;
15use datafusion_physical_expr::ScalarFunctionExpr;
16use datafusion_physical_expr::projection::ProjectionExpr;
17use datafusion_physical_expr::projection::ProjectionExprs;
18use datafusion_physical_expr::utils::collect_columns;
19use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
20use datafusion_physical_plan::expressions as df_expr;
21use itertools::Itertools;
22use vortex::dtype::DType;
23use vortex::dtype::Nullability;
24use vortex::dtype::arrow::FromArrowType;
25use vortex::expr::Binary;
26use vortex::expr::Expression;
27use vortex::expr::Like;
28use vortex::expr::LikeOptions;
29use vortex::expr::Operator;
30use vortex::expr::VTableExt;
31use vortex::expr::and_collect;
32use vortex::expr::cast;
33use vortex::expr::get_item;
34use vortex::expr::is_null;
35use vortex::expr::list_contains;
36use vortex::expr::lit;
37use vortex::expr::not;
38use vortex::expr::pack;
39use vortex::expr::root;
40use vortex::scalar::Scalar;
41
42use crate::convert::FromDataFusion;
43
44/// Result of splitting a projection into Vortex expressions and leftover DataFusion projections.
45pub struct ProcessedProjection {
46    pub scan_projection: Expression,
47    pub leftover_projection: ProjectionExprs,
48}
49
50/// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty.
51pub(crate) fn make_vortex_predicate(
52    expr_convertor: &dyn ExpressionConvertor,
53    predicate: &[Arc<dyn PhysicalExpr>],
54) -> DFResult<Option<Expression>> {
55    let exprs = predicate
56        .iter()
57        .map(|e| expr_convertor.convert(e.as_ref()))
58        .collect::<DFResult<Vec<_>>>()?;
59
60    Ok(and_collect(exprs))
61}
62
63/// Trait for converting DataFusion expressions to Vortex ones.
64pub trait ExpressionConvertor: Send + Sync {
65    /// Can an expression be pushed down given a specific schema
66    fn can_be_pushed_down(&self, expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool;
67
68    /// Try and convert a DataFusion [`PhysicalExpr`] into a Vortex [`Expression`].
69    fn convert(&self, expr: &dyn PhysicalExpr) -> DFResult<Expression>;
70
71    /// Split a projection into Vortex expressions that can be pushed down and leftover
72    /// DataFusion projections that need to be evaluated after the scan.
73    fn split_projection(
74        &self,
75        source_projection: ProjectionExprs,
76        input_schema: &Schema,
77        output_schema: &Schema,
78    ) -> DFResult<ProcessedProjection>;
79
80    /// Create a projection that reads only the required columns without pushing down
81    /// any expressions. All projection logic is applied after the scan.
82    fn no_pushdown_projection(
83        &self,
84        source_projection: ProjectionExprs,
85        input_schema: &Schema,
86    ) -> DFResult<ProcessedProjection> {
87        // Get all unique column indices referenced by the projection
88        let column_indices = source_projection.column_indices();
89
90        // Create scan projection that reads the required columns
91        let scan_columns: Vec<(String, Expression)> = column_indices
92            .into_iter()
93            .map(|idx| {
94                let field = input_schema.field(idx);
95                let name = field.name().clone();
96                (name.clone(), get_item(name, root()))
97            })
98            .collect();
99
100        Ok(ProcessedProjection {
101            scan_projection: pack(scan_columns, Nullability::NonNullable),
102            leftover_projection: source_projection,
103        })
104    }
105}
106
107/// The default [`ExpressionConvertor`].
108#[derive(Default)]
109pub struct DefaultExpressionConvertor {}
110
111impl DefaultExpressionConvertor {
112    /// Attempts to convert a DataFusion ScalarFunctionExpr to a Vortex expression.
113    fn try_convert_scalar_function(&self, scalar_fn: &ScalarFunctionExpr) -> DFResult<Expression> {
114        if let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn)
115        {
116            // DataFusion's GetFieldFunc flattens nested field access into a single call
117            // with multiple field name arguments. For example, `outer.inner.leaf` becomes
118            // get_field(Column("outer"), "inner", "leaf"). We build a chain of get_item
119            // calls for each field name in the path.
120            let (source_expr, field_names) = get_field_fn
121                .args()
122                .split_first()
123                .ok_or_else(|| exec_datafusion_err!("get_field missing source expression"))?;
124
125            let mut result = self.convert(source_expr.as_ref())?;
126            for expr in field_names {
127                let field_name = expr
128                    .as_any()
129                    .downcast_ref::<df_expr::Literal>()
130                    .ok_or_else(|| exec_datafusion_err!("get_field field name must be a literal"))?
131                    .value()
132                    .try_as_str()
133                    .flatten()
134                    .ok_or_else(|| {
135                        exec_datafusion_err!("get_field field name must be a UTF-8 string")
136                    })?;
137                result = get_item(field_name.to_string(), result);
138            }
139            return Ok(result);
140        }
141
142        Err(exec_datafusion_err!(
143            "Unsupported ScalarFunctionExpr: {}",
144            scalar_fn.name()
145        ))
146    }
147}
148
149impl ExpressionConvertor for DefaultExpressionConvertor {
150    fn can_be_pushed_down(&self, expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
151        can_be_pushed_down_impl(expr, schema)
152    }
153
154    fn convert(&self, df: &dyn PhysicalExpr) -> DFResult<Expression> {
155        // TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep
156        //  for that node, up to any `and` or `or` node.
157        if let Some(binary_expr) = df.as_any().downcast_ref::<df_expr::BinaryExpr>() {
158            let left = self.convert(binary_expr.left().as_ref())?;
159            let right = self.convert(binary_expr.right().as_ref())?;
160            let operator = try_operator_from_df(binary_expr.op())?;
161
162            return Ok(Binary.new_expr(operator, [left, right]));
163        }
164
165        if let Some(col_expr) = df.as_any().downcast_ref::<df_expr::Column>() {
166            return Ok(get_item(col_expr.name().to_owned(), root()));
167        }
168
169        if let Some(like) = df.as_any().downcast_ref::<df_expr::LikeExpr>() {
170            let child = self.convert(like.expr().as_ref())?;
171            let pattern = self.convert(like.pattern().as_ref())?;
172            return Ok(Like.new_expr(
173                LikeOptions {
174                    negated: like.negated(),
175                    case_insensitive: like.case_insensitive(),
176                },
177                [child, pattern],
178            ));
179        }
180
181        if let Some(literal) = df.as_any().downcast_ref::<df_expr::Literal>() {
182            let value = Scalar::from_df(literal.value());
183            return Ok(lit(value));
184        }
185
186        if let Some(cast_expr) = df.as_any().downcast_ref::<df_expr::CastExpr>() {
187            let cast_dtype = DType::from_arrow((cast_expr.cast_type(), Nullability::Nullable));
188            let child = self.convert(cast_expr.expr().as_ref())?;
189            return Ok(cast(child, cast_dtype));
190        }
191
192        if let Some(cast_col_expr) = df.as_any().downcast_ref::<df_expr::CastColumnExpr>() {
193            let target = cast_col_expr.target_field();
194
195            let target_dtype = DType::from_arrow((target.data_type(), target.is_nullable().into()));
196            let child = self.convert(cast_col_expr.expr().as_ref())?;
197            return Ok(cast(child, target_dtype));
198        }
199
200        if let Some(is_null_expr) = df.as_any().downcast_ref::<df_expr::IsNullExpr>() {
201            let arg = self.convert(is_null_expr.arg().as_ref())?;
202            return Ok(is_null(arg));
203        }
204
205        if let Some(is_not_null_expr) = df.as_any().downcast_ref::<df_expr::IsNotNullExpr>() {
206            let arg = self.convert(is_not_null_expr.arg().as_ref())?;
207            return Ok(not(is_null(arg)));
208        }
209
210        if let Some(in_list) = df.as_any().downcast_ref::<df_expr::InListExpr>() {
211            let value = self.convert(in_list.expr().as_ref())?;
212            let list_elements: Vec<_> = in_list
213                .list()
214                .iter()
215                .map(|e| {
216                    if let Some(lit) = e.as_any().downcast_ref::<df_expr::Literal>() {
217                        Ok(Scalar::from_df(lit.value()))
218                    } else {
219                        Err(exec_datafusion_err!("Failed to cast sub-expression"))
220                    }
221                })
222                .try_collect()?;
223
224            let list = Scalar::list(
225                list_elements[0].dtype().clone(),
226                list_elements,
227                Nullability::Nullable,
228            );
229            let expr = list_contains(lit(list), value);
230
231            return Ok(if in_list.negated() { not(expr) } else { expr });
232        }
233
234        if let Some(scalar_fn) = df.as_any().downcast_ref::<ScalarFunctionExpr>() {
235            return self.try_convert_scalar_function(scalar_fn);
236        }
237
238        Err(exec_datafusion_err!(
239            "Couldn't convert DataFusion physical {df} expression to a vortex expression"
240        ))
241    }
242
243    fn split_projection(
244        &self,
245        source_projection: ProjectionExprs,
246        input_schema: &Schema,
247        output_schema: &Schema,
248    ) -> DFResult<ProcessedProjection> {
249        let mut scan_projection = vec![];
250        let mut leftover_projection: Vec<ProjectionExpr> = vec![];
251
252        for projection_expr in source_projection.iter() {
253            let r = projection_expr.expr.apply(|node| {
254                // We only pull column children of scalar functions that we can't push into the scan.
255                if let Some(scalar_fn_expr) = node.as_any().downcast_ref::<ScalarFunctionExpr>()
256                    && !can_scalar_fn_be_pushed_down(scalar_fn_expr)
257                {
258                    scan_projection.extend(
259                        collect_columns(node)
260                            .into_iter()
261                            .map(|c| (c.name().to_string(), get_item(c.name(), root()))),
262                    );
263
264                    leftover_projection.push(projection_expr.clone());
265                    return Ok(TreeNodeRecursion::Stop);
266                }
267
268                // DataFusion assumes different decimal types can be coerced.
269                // Vortex expects a perfect match so we don't push it down.
270                if let Some(binary_expr) = node.as_any().downcast_ref::<df_expr::BinaryExpr>()
271                    && binary_expr.op().is_numerical_operators()
272                    && (is_decimal(&binary_expr.left().data_type(input_schema)?)
273                        && is_decimal(&binary_expr.right().data_type(input_schema)?))
274                {
275                    scan_projection.extend(
276                        collect_columns(node)
277                            .into_iter()
278                            .map(|c| (c.name().to_string(), get_item(c.name(), root()))),
279                    );
280
281                    leftover_projection.push(projection_expr.clone());
282                    return Ok(TreeNodeRecursion::Stop);
283                }
284
285                Ok(TreeNodeRecursion::Continue)
286            })?;
287
288            // if we didn't stop early
289            if matches!(r, TreeNodeRecursion::Continue) {
290                scan_projection.push((
291                    projection_expr.alias.clone(),
292                    self.convert(projection_expr.expr.as_ref())?,
293                ));
294                leftover_projection.push(ProjectionExpr {
295                    expr: Arc::new(df_expr::Column::new_with_schema(
296                        projection_expr.alias.as_str(),
297                        output_schema,
298                    )?),
299                    alias: projection_expr.alias.clone(),
300                });
301            }
302        }
303
304        Ok(ProcessedProjection {
305            scan_projection: pack(scan_projection, Nullability::NonNullable),
306            leftover_projection: leftover_projection.into(),
307        })
308    }
309}
310
311fn try_operator_from_df(value: &DFOperator) -> DFResult<Operator> {
312    match value {
313        DFOperator::Eq => Ok(Operator::Eq),
314        DFOperator::NotEq => Ok(Operator::NotEq),
315        DFOperator::Lt => Ok(Operator::Lt),
316        DFOperator::LtEq => Ok(Operator::Lte),
317        DFOperator::Gt => Ok(Operator::Gt),
318        DFOperator::GtEq => Ok(Operator::Gte),
319        DFOperator::And => Ok(Operator::And),
320        DFOperator::Or => Ok(Operator::Or),
321        DFOperator::Plus => Ok(Operator::Add),
322        DFOperator::Minus => Ok(Operator::Sub),
323        DFOperator::Multiply => Ok(Operator::Mul),
324        DFOperator::Divide => Ok(Operator::Div),
325        DFOperator::IsDistinctFrom
326        | DFOperator::IsNotDistinctFrom
327        | DFOperator::RegexMatch
328        | DFOperator::RegexIMatch
329        | DFOperator::RegexNotMatch
330        | DFOperator::RegexNotIMatch
331        | DFOperator::LikeMatch
332        | DFOperator::ILikeMatch
333        | DFOperator::NotLikeMatch
334        | DFOperator::NotILikeMatch
335        | DFOperator::BitwiseAnd
336        | DFOperator::BitwiseOr
337        | DFOperator::BitwiseXor
338        | DFOperator::BitwiseShiftRight
339        | DFOperator::BitwiseShiftLeft
340        | DFOperator::StringConcat
341        | DFOperator::AtArrow
342        | DFOperator::ArrowAt
343        | DFOperator::Modulo
344        | DFOperator::Arrow
345        | DFOperator::LongArrow
346        | DFOperator::HashArrow
347        | DFOperator::HashLongArrow
348        | DFOperator::AtAt
349        | DFOperator::IntegerDivide
350        | DFOperator::HashMinus
351        | DFOperator::AtQuestion
352        | DFOperator::Question
353        | DFOperator::QuestionAnd
354        | DFOperator::QuestionPipe => {
355            tracing::debug!(operator = %value, "Can't pushdown binary_operator operator");
356            Err(exec_datafusion_err!(
357                "Unsupported datafusion operator {value}"
358            ))
359        }
360    }
361}
362
363fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
364    // We currently do not support pushdown of dynamic expressions in DF.
365    // See issue: https://github.com/vortex-data/vortex/issues/4034
366    if is_dynamic_physical_expr(df_expr) {
367        return false;
368    }
369
370    let expr = df_expr.as_any();
371    if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
372        can_binary_be_pushed_down(binary, schema)
373    } else if let Some(col) = expr.downcast_ref::<df_expr::Column>() {
374        schema
375            .field_with_name(col.name())
376            .ok()
377            .is_some_and(|field| supported_data_types(field.data_type()))
378    } else if let Some(like) = expr.downcast_ref::<df_expr::LikeExpr>() {
379        can_be_pushed_down_impl(like.expr(), schema)
380            && can_be_pushed_down_impl(like.pattern(), schema)
381    } else if let Some(lit) = expr.downcast_ref::<df_expr::Literal>() {
382        supported_data_types(&lit.value().data_type())
383    } else if expr.downcast_ref::<df_expr::CastExpr>().is_some()
384        || expr.downcast_ref::<df_expr::CastColumnExpr>().is_some()
385    {
386        true
387    } else if let Some(is_null) = expr.downcast_ref::<df_expr::IsNullExpr>() {
388        can_be_pushed_down_impl(is_null.arg(), schema)
389    } else if let Some(is_not_null) = expr.downcast_ref::<df_expr::IsNotNullExpr>() {
390        can_be_pushed_down_impl(is_not_null.arg(), schema)
391    } else if let Some(in_list) = expr.downcast_ref::<df_expr::InListExpr>() {
392        can_be_pushed_down_impl(in_list.expr(), schema)
393            && in_list
394                .list()
395                .iter()
396                .all(|e| can_be_pushed_down_impl(e, schema))
397    } else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
398        can_scalar_fn_be_pushed_down(scalar_fn)
399    } else {
400        tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
401        false
402    }
403}
404
405fn can_binary_be_pushed_down(binary: &df_expr::BinaryExpr, schema: &Schema) -> bool {
406    let is_op_supported = try_operator_from_df(binary.op()).is_ok();
407    is_op_supported
408        && can_be_pushed_down_impl(binary.left(), schema)
409        && can_be_pushed_down_impl(binary.right(), schema)
410}
411
412fn supported_data_types(dt: &DataType) -> bool {
413    use DataType::*;
414
415    // For dictionary types, check if the value type is supported.
416    if let Dictionary(_, value_type) = dt {
417        return supported_data_types(value_type.as_ref());
418    }
419
420    let is_supported = dt.is_null()
421        || dt.is_numeric()
422        || matches!(
423            dt,
424            Boolean
425                | Utf8
426                | LargeUtf8
427                | Utf8View
428                | Binary
429                | LargeBinary
430                | BinaryView
431                | Date32
432                | Date64
433                | Timestamp(_, _)
434                | Time32(_)
435                | Time64(_)
436        );
437
438    if !is_supported {
439        tracing::debug!("DataFusion data type {dt:?} is not supported");
440    }
441
442    is_supported
443}
444
445/// Checks if a GetField scalar function can be pushed down.
446fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr) -> bool {
447    ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn).is_some()
448}
449
450// TODO(adam): Replace with `DataType::is_decimal` once its released.
451fn is_decimal(dt: &DataType) -> bool {
452    matches!(
453        dt,
454        DataType::Decimal32(_, _)
455            | DataType::Decimal64(_, _)
456            | DataType::Decimal128(_, _)
457            | DataType::Decimal256(_, _)
458    )
459}
460
461#[cfg(test)]
462mod tests {
463    use std::sync::Arc;
464
465    use arrow_schema::DataType;
466    use arrow_schema::Field;
467    use arrow_schema::Schema;
468    use arrow_schema::TimeUnit as ArrowTimeUnit;
469    use datafusion_common::ScalarValue;
470    use datafusion_expr::Operator as DFOperator;
471    use datafusion_physical_expr::PhysicalExpr;
472    use datafusion_physical_plan::expressions as df_expr;
473    use insta::assert_snapshot;
474    use rstest::rstest;
475
476    use super::*;
477    use crate::common_tests::TestSessionContext;
478
479    #[rstest::fixture]
480    fn test_schema() -> Schema {
481        Schema::new(vec![
482            Field::new("id", DataType::Int32, false),
483            Field::new("name", DataType::Utf8, true),
484            Field::new("score", DataType::Float64, true),
485            Field::new("active", DataType::Boolean, false),
486            Field::new(
487                "created_at",
488                DataType::Timestamp(ArrowTimeUnit::Millisecond, None),
489                true,
490            ),
491            Field::new(
492                "unsupported_list",
493                DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
494                true,
495            ),
496        ])
497    }
498
499    #[test]
500    fn test_make_vortex_predicate_empty() {
501        let expr_convertor = DefaultExpressionConvertor::default();
502        let result = make_vortex_predicate(&expr_convertor, &[]).unwrap();
503        assert!(result.is_none());
504    }
505
506    #[test]
507    fn test_make_vortex_predicate_single() {
508        let expr_convertor = DefaultExpressionConvertor::default();
509        let col_expr = Arc::new(df_expr::Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
510        let result = make_vortex_predicate(&expr_convertor, &[col_expr]).unwrap();
511        assert!(result.is_some());
512    }
513
514    #[test]
515    fn test_make_vortex_predicate_multiple() {
516        let expr_convertor = DefaultExpressionConvertor::default();
517        let col1 = Arc::new(df_expr::Column::new("col1", 0)) as Arc<dyn PhysicalExpr>;
518        let col2 = Arc::new(df_expr::Column::new("col2", 1)) as Arc<dyn PhysicalExpr>;
519        let result = make_vortex_predicate(&expr_convertor, &[col1, col2]).unwrap();
520        assert!(result.is_some());
521        // Result should be an AND expression combining the two columns
522    }
523
524    #[rstest]
525    #[case::eq(DFOperator::Eq, Operator::Eq)]
526    #[case::not_eq(DFOperator::NotEq, Operator::NotEq)]
527    #[case::lt(DFOperator::Lt, Operator::Lt)]
528    #[case::lte(DFOperator::LtEq, Operator::Lte)]
529    #[case::gt(DFOperator::Gt, Operator::Gt)]
530    #[case::gte(DFOperator::GtEq, Operator::Gte)]
531    #[case::and(DFOperator::And, Operator::And)]
532    #[case::or(DFOperator::Or, Operator::Or)]
533    #[case::plus(DFOperator::Plus, Operator::Add)]
534    #[case::plus(DFOperator::Minus, Operator::Sub)]
535    #[case::plus(DFOperator::Multiply, Operator::Mul)]
536    #[case::plus(DFOperator::Divide, Operator::Div)]
537    fn test_operator_conversion_supported(
538        #[case] df_op: DFOperator,
539        #[case] expected_vortex_op: Operator,
540    ) {
541        let result = try_operator_from_df(&df_op).unwrap();
542        assert_eq!(result, expected_vortex_op);
543    }
544
545    #[rstest]
546    #[case::modulo(DFOperator::Modulo)]
547    #[case::bitwise_and(DFOperator::BitwiseAnd)]
548    #[case::regex_match(DFOperator::RegexMatch)]
549    #[case::like_match(DFOperator::LikeMatch)]
550    fn test_operator_conversion_unsupported(#[case] df_op: DFOperator) {
551        let result = try_operator_from_df(&df_op);
552        assert!(result.is_err());
553        assert!(
554            result
555                .unwrap_err()
556                .to_string()
557                .contains("Unsupported datafusion operator")
558        );
559    }
560
561    #[test]
562    fn test_expr_from_df_column() {
563        let col_expr = df_expr::Column::new("test_column", 0);
564        let result = DefaultExpressionConvertor::default()
565            .convert(&col_expr)
566            .unwrap();
567
568        assert_snapshot!(result.display_tree().to_string(), @r"
569        vortex.get_item(test_column)
570        └── input: vortex.root()
571        ");
572    }
573
574    #[test]
575    fn test_expr_from_df_literal() {
576        let literal_expr = df_expr::Literal::new(ScalarValue::Int32(Some(42)));
577        let result = DefaultExpressionConvertor::default()
578            .convert(&literal_expr)
579            .unwrap();
580
581        assert_snapshot!(result.display_tree().to_string(), @"vortex.literal(42i32)");
582    }
583
584    #[test]
585    fn test_expr_from_df_binary() {
586        let left = Arc::new(df_expr::Column::new("left", 0)) as Arc<dyn PhysicalExpr>;
587        let right =
588            Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
589        let binary_expr = df_expr::BinaryExpr::new(left, DFOperator::Eq, right);
590
591        let result = DefaultExpressionConvertor::default()
592            .convert(&binary_expr)
593            .unwrap();
594
595        assert_snapshot!(result.display_tree().to_string(), @r"
596        vortex.binary(=)
597        ├── lhs: vortex.get_item(left)
598        │   └── input: vortex.root()
599        └── rhs: vortex.literal(42i32)
600        ");
601    }
602
603    #[rstest]
604    #[case::like_normal(false, false)]
605    #[case::like_negated(true, false)]
606    #[case::like_case_insensitive(false, true)]
607    #[case::like_negated_case_insensitive(true, true)]
608    fn test_expr_from_df_like(#[case] negated: bool, #[case] case_insensitive: bool) {
609        let expr = Arc::new(df_expr::Column::new("text_col", 0)) as Arc<dyn PhysicalExpr>;
610        let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
611            "test%".to_string(),
612        )))) as Arc<dyn PhysicalExpr>;
613        let like_expr = df_expr::LikeExpr::new(negated, case_insensitive, expr, pattern);
614
615        let result = DefaultExpressionConvertor::default()
616            .convert(&like_expr)
617            .unwrap();
618        let like_opts = result.as_::<Like>();
619        assert_eq!(
620            like_opts,
621            &LikeOptions {
622                negated,
623                case_insensitive
624            }
625        );
626    }
627
628    #[rstest]
629    // Supported types
630    #[case::null(DataType::Null, true)]
631    #[case::boolean(DataType::Boolean, true)]
632    #[case::int8(DataType::Int8, true)]
633    #[case::int16(DataType::Int16, true)]
634    #[case::int32(DataType::Int32, true)]
635    #[case::int64(DataType::Int64, true)]
636    #[case::uint8(DataType::UInt8, true)]
637    #[case::uint16(DataType::UInt16, true)]
638    #[case::uint32(DataType::UInt32, true)]
639    #[case::uint64(DataType::UInt64, true)]
640    #[case::float32(DataType::Float32, true)]
641    #[case::float64(DataType::Float64, true)]
642    #[case::utf8(DataType::Utf8, true)]
643    #[case::utf8_view(DataType::Utf8View, true)]
644    #[case::binary(DataType::Binary, true)]
645    #[case::binary_view(DataType::BinaryView, true)]
646    #[case::date32(DataType::Date32, true)]
647    #[case::date64(DataType::Date64, true)]
648    #[case::timestamp_ms(DataType::Timestamp(ArrowTimeUnit::Millisecond, None), true)]
649    #[case::timestamp_us(
650        DataType::Timestamp(ArrowTimeUnit::Microsecond, Some(Arc::from("UTC"))),
651        true
652    )]
653    #[case::time32_s(DataType::Time32(ArrowTimeUnit::Second), true)]
654    #[case::time64_ns(DataType::Time64(ArrowTimeUnit::Nanosecond), true)]
655    // Unsupported types
656    #[case::list(
657        DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
658        false
659    )]
660    #[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()
661    ), false)]
662    // Dictionary types - should be supported if value type is supported
663    #[case::dict_utf8(
664        DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
665        true
666    )]
667    #[case::dict_int32(
668        DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Int32)),
669        true
670    )]
671    #[case::dict_unsupported(
672        DataType::Dictionary(
673            Box::new(DataType::UInt32),
674            Box::new(DataType::List(Arc::new(Field::new("item", DataType::Int32, true))))
675        ),
676        false
677    )]
678    fn test_supported_data_types(#[case] data_type: DataType, #[case] expected: bool) {
679        assert_eq!(supported_data_types(&data_type), expected);
680    }
681
682    #[rstest]
683    fn test_can_be_pushed_down_column_supported(test_schema: Schema) {
684        let col_expr = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
685
686        assert!(can_be_pushed_down_impl(&col_expr, &test_schema));
687    }
688
689    #[rstest]
690    fn test_can_be_pushed_down_column_unsupported_type(test_schema: Schema) {
691        let col_expr =
692            Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
693
694        assert!(!can_be_pushed_down_impl(&col_expr, &test_schema));
695    }
696
697    #[rstest]
698    fn test_can_be_pushed_down_column_not_found(test_schema: Schema) {
699        let col_expr = Arc::new(df_expr::Column::new("nonexistent", 99)) as Arc<dyn PhysicalExpr>;
700
701        assert!(!can_be_pushed_down_impl(&col_expr, &test_schema));
702    }
703
704    #[rstest]
705    fn test_can_be_pushed_down_literal_supported(test_schema: Schema) {
706        let lit_expr =
707            Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
708
709        assert!(can_be_pushed_down_impl(&lit_expr, &test_schema));
710    }
711
712    #[rstest]
713    fn test_can_be_pushed_down_literal_unsupported(test_schema: Schema) {
714        // Use a simpler unsupported type - Duration is not supported
715        let unsupported_literal = ScalarValue::DurationSecond(Some(42));
716        let lit_expr =
717            Arc::new(df_expr::Literal::new(unsupported_literal)) as Arc<dyn PhysicalExpr>;
718
719        assert!(!can_be_pushed_down_impl(&lit_expr, &test_schema));
720    }
721
722    #[rstest]
723    fn test_can_be_pushed_down_binary_supported(test_schema: Schema) {
724        let left = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
725        let right =
726            Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
727        let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right))
728            as Arc<dyn PhysicalExpr>;
729
730        assert!(can_be_pushed_down_impl(&binary_expr, &test_schema));
731    }
732
733    #[rstest]
734    fn test_can_be_pushed_down_binary_unsupported_operator(test_schema: Schema) {
735        let left = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
736        let right =
737            Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
738        let binary_expr = Arc::new(df_expr::BinaryExpr::new(
739            left,
740            DFOperator::AtQuestion,
741            right,
742        )) as Arc<dyn PhysicalExpr>;
743
744        assert!(!can_be_pushed_down_impl(&binary_expr, &test_schema));
745    }
746
747    #[rstest]
748    fn test_can_be_pushed_down_binary_unsupported_operand(test_schema: Schema) {
749        let left = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
750        let right =
751            Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
752        let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right))
753            as Arc<dyn PhysicalExpr>;
754
755        assert!(!can_be_pushed_down_impl(&binary_expr, &test_schema));
756    }
757
758    #[rstest]
759    fn test_can_be_pushed_down_like_supported(test_schema: Schema) {
760        let expr = Arc::new(df_expr::Column::new("name", 1)) as Arc<dyn PhysicalExpr>;
761        let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
762            "test%".to_string(),
763        )))) as Arc<dyn PhysicalExpr>;
764        let like_expr =
765            Arc::new(df_expr::LikeExpr::new(false, false, expr, pattern)) as Arc<dyn PhysicalExpr>;
766
767        assert!(can_be_pushed_down_impl(&like_expr, &test_schema));
768    }
769
770    #[rstest]
771    fn test_can_be_pushed_down_like_unsupported_operand(test_schema: Schema) {
772        let expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
773        let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
774            "test%".to_string(),
775        )))) as Arc<dyn PhysicalExpr>;
776        let like_expr =
777            Arc::new(df_expr::LikeExpr::new(false, false, expr, pattern)) as Arc<dyn PhysicalExpr>;
778
779        assert!(!can_be_pushed_down_impl(&like_expr, &test_schema));
780    }
781
782    // https://github.com/vortex-data/vortex/issues/6211
783    #[tokio::test]
784    async fn test_cast_int_to_string() -> anyhow::Result<()> {
785        let ctx = TestSessionContext::default();
786
787        ctx.session
788            .sql(r#"copy (select 1 as id) to 'example.vortex'"#)
789            .await?
790            .show()
791            .await?;
792
793        ctx.session
794            .sql(r#"select cast(id as string) as sid from 'example.vortex' where id > 0"#)
795            .await?
796            .show()
797            .await?;
798
799        ctx.session
800            .sql(r#"select id from 'example.vortex' where cast (id as string) == '1'"#)
801            .await?
802            .show()
803            .await?;
804
805        // This fails as it pushes string cast to the scan
806        ctx.session
807            .sql(r#"select cast(id as string) from 'example.vortex'"#)
808            .await?
809            .collect()
810            .await?;
811
812        Ok(())
813    }
814}