1use std::sync::Arc;
5
6use arrow_schema::DataType;
7use arrow_schema::Schema;
8use datafusion_common::Result as DFResult;
9use datafusion_common::exec_datafusion_err;
10use datafusion_common::tree_node::TreeNode;
11use datafusion_common::tree_node::TreeNodeRecursion;
12use datafusion_expr::Operator as DFOperator;
13use datafusion_functions::core::getfield::GetFieldFunc;
14use datafusion_physical_expr::PhysicalExpr;
15use datafusion_physical_expr::ScalarFunctionExpr;
16use datafusion_physical_expr::projection::ProjectionExpr;
17use datafusion_physical_expr::projection::ProjectionExprs;
18use datafusion_physical_expr::utils::collect_columns;
19use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
20use datafusion_physical_plan::expressions as df_expr;
21use itertools::Itertools;
22use vortex::dtype::DType;
23use vortex::dtype::Nullability;
24use vortex::dtype::arrow::FromArrowType;
25use vortex::expr::Binary;
26use vortex::expr::Expression;
27use vortex::expr::Like;
28use vortex::expr::LikeOptions;
29use vortex::expr::Operator;
30use vortex::expr::VTableExt;
31use vortex::expr::and_collect;
32use vortex::expr::cast;
33use vortex::expr::get_item;
34use vortex::expr::is_null;
35use vortex::expr::list_contains;
36use vortex::expr::lit;
37use vortex::expr::not;
38use vortex::expr::pack;
39use vortex::expr::root;
40use vortex::scalar::Scalar;
41
42use crate::convert::FromDataFusion;
43
44pub struct ProcessedProjection {
46 pub scan_projection: Expression,
47 pub leftover_projection: ProjectionExprs,
48}
49
50pub(crate) fn make_vortex_predicate(
52 expr_convertor: &dyn ExpressionConvertor,
53 predicate: &[Arc<dyn PhysicalExpr>],
54) -> DFResult<Option<Expression>> {
55 let exprs = predicate
56 .iter()
57 .map(|e| expr_convertor.convert(e.as_ref()))
58 .collect::<DFResult<Vec<_>>>()?;
59
60 Ok(and_collect(exprs))
61}
62
63pub trait ExpressionConvertor: Send + Sync {
65 fn can_be_pushed_down(&self, expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool;
67
68 fn convert(&self, expr: &dyn PhysicalExpr) -> DFResult<Expression>;
70
71 fn split_projection(
74 &self,
75 source_projection: ProjectionExprs,
76 input_schema: &Schema,
77 output_schema: &Schema,
78 ) -> DFResult<ProcessedProjection>;
79
80 fn no_pushdown_projection(
83 &self,
84 source_projection: ProjectionExprs,
85 input_schema: &Schema,
86 ) -> DFResult<ProcessedProjection> {
87 let column_indices = source_projection.column_indices();
89
90 let scan_columns: Vec<(String, Expression)> = column_indices
92 .into_iter()
93 .map(|idx| {
94 let field = input_schema.field(idx);
95 let name = field.name().clone();
96 (name.clone(), get_item(name, root()))
97 })
98 .collect();
99
100 Ok(ProcessedProjection {
101 scan_projection: pack(scan_columns, Nullability::NonNullable),
102 leftover_projection: source_projection,
103 })
104 }
105}
106
107#[derive(Default)]
109pub struct DefaultExpressionConvertor {}
110
111impl DefaultExpressionConvertor {
112 fn try_convert_scalar_function(&self, scalar_fn: &ScalarFunctionExpr) -> DFResult<Expression> {
114 if let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn)
115 {
116 let (source_expr, field_names) = get_field_fn
121 .args()
122 .split_first()
123 .ok_or_else(|| exec_datafusion_err!("get_field missing source expression"))?;
124
125 let mut result = self.convert(source_expr.as_ref())?;
126 for expr in field_names {
127 let field_name = expr
128 .as_any()
129 .downcast_ref::<df_expr::Literal>()
130 .ok_or_else(|| exec_datafusion_err!("get_field field name must be a literal"))?
131 .value()
132 .try_as_str()
133 .flatten()
134 .ok_or_else(|| {
135 exec_datafusion_err!("get_field field name must be a UTF-8 string")
136 })?;
137 result = get_item(field_name.to_string(), result);
138 }
139 return Ok(result);
140 }
141
142 Err(exec_datafusion_err!(
143 "Unsupported ScalarFunctionExpr: {}",
144 scalar_fn.name()
145 ))
146 }
147}
148
149impl ExpressionConvertor for DefaultExpressionConvertor {
150 fn can_be_pushed_down(&self, expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
151 can_be_pushed_down_impl(expr, schema)
152 }
153
154 fn convert(&self, df: &dyn PhysicalExpr) -> DFResult<Expression> {
155 if let Some(binary_expr) = df.as_any().downcast_ref::<df_expr::BinaryExpr>() {
158 let left = self.convert(binary_expr.left().as_ref())?;
159 let right = self.convert(binary_expr.right().as_ref())?;
160 let operator = try_operator_from_df(binary_expr.op())?;
161
162 return Ok(Binary.new_expr(operator, [left, right]));
163 }
164
165 if let Some(col_expr) = df.as_any().downcast_ref::<df_expr::Column>() {
166 return Ok(get_item(col_expr.name().to_owned(), root()));
167 }
168
169 if let Some(like) = df.as_any().downcast_ref::<df_expr::LikeExpr>() {
170 let child = self.convert(like.expr().as_ref())?;
171 let pattern = self.convert(like.pattern().as_ref())?;
172 return Ok(Like.new_expr(
173 LikeOptions {
174 negated: like.negated(),
175 case_insensitive: like.case_insensitive(),
176 },
177 [child, pattern],
178 ));
179 }
180
181 if let Some(literal) = df.as_any().downcast_ref::<df_expr::Literal>() {
182 let value = Scalar::from_df(literal.value());
183 return Ok(lit(value));
184 }
185
186 if let Some(cast_expr) = df.as_any().downcast_ref::<df_expr::CastExpr>() {
187 let cast_dtype = DType::from_arrow((cast_expr.cast_type(), Nullability::Nullable));
188 let child = self.convert(cast_expr.expr().as_ref())?;
189 return Ok(cast(child, cast_dtype));
190 }
191
192 if let Some(cast_col_expr) = df.as_any().downcast_ref::<df_expr::CastColumnExpr>() {
193 let target = cast_col_expr.target_field();
194
195 let target_dtype = DType::from_arrow((target.data_type(), target.is_nullable().into()));
196 let child = self.convert(cast_col_expr.expr().as_ref())?;
197 return Ok(cast(child, target_dtype));
198 }
199
200 if let Some(is_null_expr) = df.as_any().downcast_ref::<df_expr::IsNullExpr>() {
201 let arg = self.convert(is_null_expr.arg().as_ref())?;
202 return Ok(is_null(arg));
203 }
204
205 if let Some(is_not_null_expr) = df.as_any().downcast_ref::<df_expr::IsNotNullExpr>() {
206 let arg = self.convert(is_not_null_expr.arg().as_ref())?;
207 return Ok(not(is_null(arg)));
208 }
209
210 if let Some(in_list) = df.as_any().downcast_ref::<df_expr::InListExpr>() {
211 let value = self.convert(in_list.expr().as_ref())?;
212 let list_elements: Vec<_> = in_list
213 .list()
214 .iter()
215 .map(|e| {
216 if let Some(lit) = e.as_any().downcast_ref::<df_expr::Literal>() {
217 Ok(Scalar::from_df(lit.value()))
218 } else {
219 Err(exec_datafusion_err!("Failed to cast sub-expression"))
220 }
221 })
222 .try_collect()?;
223
224 let list = Scalar::list(
225 list_elements[0].dtype().clone(),
226 list_elements,
227 Nullability::Nullable,
228 );
229 let expr = list_contains(lit(list), value);
230
231 return Ok(if in_list.negated() { not(expr) } else { expr });
232 }
233
234 if let Some(scalar_fn) = df.as_any().downcast_ref::<ScalarFunctionExpr>() {
235 return self.try_convert_scalar_function(scalar_fn);
236 }
237
238 Err(exec_datafusion_err!(
239 "Couldn't convert DataFusion physical {df} expression to a vortex expression"
240 ))
241 }
242
243 fn split_projection(
244 &self,
245 source_projection: ProjectionExprs,
246 input_schema: &Schema,
247 output_schema: &Schema,
248 ) -> DFResult<ProcessedProjection> {
249 let mut scan_projection = vec![];
250 let mut leftover_projection: Vec<ProjectionExpr> = vec![];
251
252 for projection_expr in source_projection.iter() {
253 let r = projection_expr.expr.apply(|node| {
254 if let Some(scalar_fn_expr) = node.as_any().downcast_ref::<ScalarFunctionExpr>()
256 && !can_scalar_fn_be_pushed_down(scalar_fn_expr)
257 {
258 scan_projection.extend(
259 collect_columns(node)
260 .into_iter()
261 .map(|c| (c.name().to_string(), get_item(c.name(), root()))),
262 );
263
264 leftover_projection.push(projection_expr.clone());
265 return Ok(TreeNodeRecursion::Stop);
266 }
267
268 if let Some(binary_expr) = node.as_any().downcast_ref::<df_expr::BinaryExpr>()
271 && binary_expr.op().is_numerical_operators()
272 && (is_decimal(&binary_expr.left().data_type(input_schema)?)
273 && is_decimal(&binary_expr.right().data_type(input_schema)?))
274 {
275 scan_projection.extend(
276 collect_columns(node)
277 .into_iter()
278 .map(|c| (c.name().to_string(), get_item(c.name(), root()))),
279 );
280
281 leftover_projection.push(projection_expr.clone());
282 return Ok(TreeNodeRecursion::Stop);
283 }
284
285 Ok(TreeNodeRecursion::Continue)
286 })?;
287
288 if matches!(r, TreeNodeRecursion::Continue) {
290 scan_projection.push((
291 projection_expr.alias.clone(),
292 self.convert(projection_expr.expr.as_ref())?,
293 ));
294 leftover_projection.push(ProjectionExpr {
295 expr: Arc::new(df_expr::Column::new_with_schema(
296 projection_expr.alias.as_str(),
297 output_schema,
298 )?),
299 alias: projection_expr.alias.clone(),
300 });
301 }
302 }
303
304 Ok(ProcessedProjection {
305 scan_projection: pack(scan_projection, Nullability::NonNullable),
306 leftover_projection: leftover_projection.into(),
307 })
308 }
309}
310
311fn try_operator_from_df(value: &DFOperator) -> DFResult<Operator> {
312 match value {
313 DFOperator::Eq => Ok(Operator::Eq),
314 DFOperator::NotEq => Ok(Operator::NotEq),
315 DFOperator::Lt => Ok(Operator::Lt),
316 DFOperator::LtEq => Ok(Operator::Lte),
317 DFOperator::Gt => Ok(Operator::Gt),
318 DFOperator::GtEq => Ok(Operator::Gte),
319 DFOperator::And => Ok(Operator::And),
320 DFOperator::Or => Ok(Operator::Or),
321 DFOperator::Plus => Ok(Operator::Add),
322 DFOperator::Minus => Ok(Operator::Sub),
323 DFOperator::Multiply => Ok(Operator::Mul),
324 DFOperator::Divide => Ok(Operator::Div),
325 DFOperator::IsDistinctFrom
326 | DFOperator::IsNotDistinctFrom
327 | DFOperator::RegexMatch
328 | DFOperator::RegexIMatch
329 | DFOperator::RegexNotMatch
330 | DFOperator::RegexNotIMatch
331 | DFOperator::LikeMatch
332 | DFOperator::ILikeMatch
333 | DFOperator::NotLikeMatch
334 | DFOperator::NotILikeMatch
335 | DFOperator::BitwiseAnd
336 | DFOperator::BitwiseOr
337 | DFOperator::BitwiseXor
338 | DFOperator::BitwiseShiftRight
339 | DFOperator::BitwiseShiftLeft
340 | DFOperator::StringConcat
341 | DFOperator::AtArrow
342 | DFOperator::ArrowAt
343 | DFOperator::Modulo
344 | DFOperator::Arrow
345 | DFOperator::LongArrow
346 | DFOperator::HashArrow
347 | DFOperator::HashLongArrow
348 | DFOperator::AtAt
349 | DFOperator::IntegerDivide
350 | DFOperator::HashMinus
351 | DFOperator::AtQuestion
352 | DFOperator::Question
353 | DFOperator::QuestionAnd
354 | DFOperator::QuestionPipe => {
355 tracing::debug!(operator = %value, "Can't pushdown binary_operator operator");
356 Err(exec_datafusion_err!(
357 "Unsupported datafusion operator {value}"
358 ))
359 }
360 }
361}
362
363fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
364 if is_dynamic_physical_expr(df_expr) {
367 return false;
368 }
369
370 let expr = df_expr.as_any();
371 if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
372 can_binary_be_pushed_down(binary, schema)
373 } else if let Some(col) = expr.downcast_ref::<df_expr::Column>() {
374 schema
375 .field_with_name(col.name())
376 .ok()
377 .is_some_and(|field| supported_data_types(field.data_type()))
378 } else if let Some(like) = expr.downcast_ref::<df_expr::LikeExpr>() {
379 can_be_pushed_down_impl(like.expr(), schema)
380 && can_be_pushed_down_impl(like.pattern(), schema)
381 } else if let Some(lit) = expr.downcast_ref::<df_expr::Literal>() {
382 supported_data_types(&lit.value().data_type())
383 } else if expr.downcast_ref::<df_expr::CastExpr>().is_some()
384 || expr.downcast_ref::<df_expr::CastColumnExpr>().is_some()
385 {
386 true
387 } else if let Some(is_null) = expr.downcast_ref::<df_expr::IsNullExpr>() {
388 can_be_pushed_down_impl(is_null.arg(), schema)
389 } else if let Some(is_not_null) = expr.downcast_ref::<df_expr::IsNotNullExpr>() {
390 can_be_pushed_down_impl(is_not_null.arg(), schema)
391 } else if let Some(in_list) = expr.downcast_ref::<df_expr::InListExpr>() {
392 can_be_pushed_down_impl(in_list.expr(), schema)
393 && in_list
394 .list()
395 .iter()
396 .all(|e| can_be_pushed_down_impl(e, schema))
397 } else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
398 can_scalar_fn_be_pushed_down(scalar_fn)
399 } else {
400 tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
401 false
402 }
403}
404
405fn can_binary_be_pushed_down(binary: &df_expr::BinaryExpr, schema: &Schema) -> bool {
406 let is_op_supported = try_operator_from_df(binary.op()).is_ok();
407 is_op_supported
408 && can_be_pushed_down_impl(binary.left(), schema)
409 && can_be_pushed_down_impl(binary.right(), schema)
410}
411
412fn supported_data_types(dt: &DataType) -> bool {
413 use DataType::*;
414
415 if let Dictionary(_, value_type) = dt {
417 return supported_data_types(value_type.as_ref());
418 }
419
420 let is_supported = dt.is_null()
421 || dt.is_numeric()
422 || matches!(
423 dt,
424 Boolean
425 | Utf8
426 | LargeUtf8
427 | Utf8View
428 | Binary
429 | LargeBinary
430 | BinaryView
431 | Date32
432 | Date64
433 | Timestamp(_, _)
434 | Time32(_)
435 | Time64(_)
436 );
437
438 if !is_supported {
439 tracing::debug!("DataFusion data type {dt:?} is not supported");
440 }
441
442 is_supported
443}
444
445fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr) -> bool {
447 ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn).is_some()
448}
449
450fn is_decimal(dt: &DataType) -> bool {
452 matches!(
453 dt,
454 DataType::Decimal32(_, _)
455 | DataType::Decimal64(_, _)
456 | DataType::Decimal128(_, _)
457 | DataType::Decimal256(_, _)
458 )
459}
460
461#[cfg(test)]
462mod tests {
463 use std::sync::Arc;
464
465 use arrow_schema::DataType;
466 use arrow_schema::Field;
467 use arrow_schema::Schema;
468 use arrow_schema::TimeUnit as ArrowTimeUnit;
469 use datafusion_common::ScalarValue;
470 use datafusion_expr::Operator as DFOperator;
471 use datafusion_physical_expr::PhysicalExpr;
472 use datafusion_physical_plan::expressions as df_expr;
473 use insta::assert_snapshot;
474 use rstest::rstest;
475
476 use super::*;
477 use crate::common_tests::TestSessionContext;
478
479 #[rstest::fixture]
480 fn test_schema() -> Schema {
481 Schema::new(vec![
482 Field::new("id", DataType::Int32, false),
483 Field::new("name", DataType::Utf8, true),
484 Field::new("score", DataType::Float64, true),
485 Field::new("active", DataType::Boolean, false),
486 Field::new(
487 "created_at",
488 DataType::Timestamp(ArrowTimeUnit::Millisecond, None),
489 true,
490 ),
491 Field::new(
492 "unsupported_list",
493 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
494 true,
495 ),
496 ])
497 }
498
499 #[test]
500 fn test_make_vortex_predicate_empty() {
501 let expr_convertor = DefaultExpressionConvertor::default();
502 let result = make_vortex_predicate(&expr_convertor, &[]).unwrap();
503 assert!(result.is_none());
504 }
505
506 #[test]
507 fn test_make_vortex_predicate_single() {
508 let expr_convertor = DefaultExpressionConvertor::default();
509 let col_expr = Arc::new(df_expr::Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
510 let result = make_vortex_predicate(&expr_convertor, &[col_expr]).unwrap();
511 assert!(result.is_some());
512 }
513
514 #[test]
515 fn test_make_vortex_predicate_multiple() {
516 let expr_convertor = DefaultExpressionConvertor::default();
517 let col1 = Arc::new(df_expr::Column::new("col1", 0)) as Arc<dyn PhysicalExpr>;
518 let col2 = Arc::new(df_expr::Column::new("col2", 1)) as Arc<dyn PhysicalExpr>;
519 let result = make_vortex_predicate(&expr_convertor, &[col1, col2]).unwrap();
520 assert!(result.is_some());
521 }
523
524 #[rstest]
525 #[case::eq(DFOperator::Eq, Operator::Eq)]
526 #[case::not_eq(DFOperator::NotEq, Operator::NotEq)]
527 #[case::lt(DFOperator::Lt, Operator::Lt)]
528 #[case::lte(DFOperator::LtEq, Operator::Lte)]
529 #[case::gt(DFOperator::Gt, Operator::Gt)]
530 #[case::gte(DFOperator::GtEq, Operator::Gte)]
531 #[case::and(DFOperator::And, Operator::And)]
532 #[case::or(DFOperator::Or, Operator::Or)]
533 #[case::plus(DFOperator::Plus, Operator::Add)]
534 #[case::plus(DFOperator::Minus, Operator::Sub)]
535 #[case::plus(DFOperator::Multiply, Operator::Mul)]
536 #[case::plus(DFOperator::Divide, Operator::Div)]
537 fn test_operator_conversion_supported(
538 #[case] df_op: DFOperator,
539 #[case] expected_vortex_op: Operator,
540 ) {
541 let result = try_operator_from_df(&df_op).unwrap();
542 assert_eq!(result, expected_vortex_op);
543 }
544
545 #[rstest]
546 #[case::modulo(DFOperator::Modulo)]
547 #[case::bitwise_and(DFOperator::BitwiseAnd)]
548 #[case::regex_match(DFOperator::RegexMatch)]
549 #[case::like_match(DFOperator::LikeMatch)]
550 fn test_operator_conversion_unsupported(#[case] df_op: DFOperator) {
551 let result = try_operator_from_df(&df_op);
552 assert!(result.is_err());
553 assert!(
554 result
555 .unwrap_err()
556 .to_string()
557 .contains("Unsupported datafusion operator")
558 );
559 }
560
561 #[test]
562 fn test_expr_from_df_column() {
563 let col_expr = df_expr::Column::new("test_column", 0);
564 let result = DefaultExpressionConvertor::default()
565 .convert(&col_expr)
566 .unwrap();
567
568 assert_snapshot!(result.display_tree().to_string(), @r"
569 vortex.get_item(test_column)
570 └── input: vortex.root()
571 ");
572 }
573
574 #[test]
575 fn test_expr_from_df_literal() {
576 let literal_expr = df_expr::Literal::new(ScalarValue::Int32(Some(42)));
577 let result = DefaultExpressionConvertor::default()
578 .convert(&literal_expr)
579 .unwrap();
580
581 assert_snapshot!(result.display_tree().to_string(), @"vortex.literal(42i32)");
582 }
583
584 #[test]
585 fn test_expr_from_df_binary() {
586 let left = Arc::new(df_expr::Column::new("left", 0)) as Arc<dyn PhysicalExpr>;
587 let right =
588 Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
589 let binary_expr = df_expr::BinaryExpr::new(left, DFOperator::Eq, right);
590
591 let result = DefaultExpressionConvertor::default()
592 .convert(&binary_expr)
593 .unwrap();
594
595 assert_snapshot!(result.display_tree().to_string(), @r"
596 vortex.binary(=)
597 ├── lhs: vortex.get_item(left)
598 │ └── input: vortex.root()
599 └── rhs: vortex.literal(42i32)
600 ");
601 }
602
603 #[rstest]
604 #[case::like_normal(false, false)]
605 #[case::like_negated(true, false)]
606 #[case::like_case_insensitive(false, true)]
607 #[case::like_negated_case_insensitive(true, true)]
608 fn test_expr_from_df_like(#[case] negated: bool, #[case] case_insensitive: bool) {
609 let expr = Arc::new(df_expr::Column::new("text_col", 0)) as Arc<dyn PhysicalExpr>;
610 let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
611 "test%".to_string(),
612 )))) as Arc<dyn PhysicalExpr>;
613 let like_expr = df_expr::LikeExpr::new(negated, case_insensitive, expr, pattern);
614
615 let result = DefaultExpressionConvertor::default()
616 .convert(&like_expr)
617 .unwrap();
618 let like_opts = result.as_::<Like>();
619 assert_eq!(
620 like_opts,
621 &LikeOptions {
622 negated,
623 case_insensitive
624 }
625 );
626 }
627
628 #[rstest]
629 #[case::null(DataType::Null, true)]
631 #[case::boolean(DataType::Boolean, true)]
632 #[case::int8(DataType::Int8, true)]
633 #[case::int16(DataType::Int16, true)]
634 #[case::int32(DataType::Int32, true)]
635 #[case::int64(DataType::Int64, true)]
636 #[case::uint8(DataType::UInt8, true)]
637 #[case::uint16(DataType::UInt16, true)]
638 #[case::uint32(DataType::UInt32, true)]
639 #[case::uint64(DataType::UInt64, true)]
640 #[case::float32(DataType::Float32, true)]
641 #[case::float64(DataType::Float64, true)]
642 #[case::utf8(DataType::Utf8, true)]
643 #[case::utf8_view(DataType::Utf8View, true)]
644 #[case::binary(DataType::Binary, true)]
645 #[case::binary_view(DataType::BinaryView, true)]
646 #[case::date32(DataType::Date32, true)]
647 #[case::date64(DataType::Date64, true)]
648 #[case::timestamp_ms(DataType::Timestamp(ArrowTimeUnit::Millisecond, None), true)]
649 #[case::timestamp_us(
650 DataType::Timestamp(ArrowTimeUnit::Microsecond, Some(Arc::from("UTC"))),
651 true
652 )]
653 #[case::time32_s(DataType::Time32(ArrowTimeUnit::Second), true)]
654 #[case::time64_ns(DataType::Time64(ArrowTimeUnit::Nanosecond), true)]
655 #[case::list(
657 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
658 false
659 )]
660 #[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()
661 ), false)]
662 #[case::dict_utf8(
664 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
665 true
666 )]
667 #[case::dict_int32(
668 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Int32)),
669 true
670 )]
671 #[case::dict_unsupported(
672 DataType::Dictionary(
673 Box::new(DataType::UInt32),
674 Box::new(DataType::List(Arc::new(Field::new("item", DataType::Int32, true))))
675 ),
676 false
677 )]
678 fn test_supported_data_types(#[case] data_type: DataType, #[case] expected: bool) {
679 assert_eq!(supported_data_types(&data_type), expected);
680 }
681
682 #[rstest]
683 fn test_can_be_pushed_down_column_supported(test_schema: Schema) {
684 let col_expr = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
685
686 assert!(can_be_pushed_down_impl(&col_expr, &test_schema));
687 }
688
689 #[rstest]
690 fn test_can_be_pushed_down_column_unsupported_type(test_schema: Schema) {
691 let col_expr =
692 Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
693
694 assert!(!can_be_pushed_down_impl(&col_expr, &test_schema));
695 }
696
697 #[rstest]
698 fn test_can_be_pushed_down_column_not_found(test_schema: Schema) {
699 let col_expr = Arc::new(df_expr::Column::new("nonexistent", 99)) as Arc<dyn PhysicalExpr>;
700
701 assert!(!can_be_pushed_down_impl(&col_expr, &test_schema));
702 }
703
704 #[rstest]
705 fn test_can_be_pushed_down_literal_supported(test_schema: Schema) {
706 let lit_expr =
707 Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
708
709 assert!(can_be_pushed_down_impl(&lit_expr, &test_schema));
710 }
711
712 #[rstest]
713 fn test_can_be_pushed_down_literal_unsupported(test_schema: Schema) {
714 let unsupported_literal = ScalarValue::DurationSecond(Some(42));
716 let lit_expr =
717 Arc::new(df_expr::Literal::new(unsupported_literal)) as Arc<dyn PhysicalExpr>;
718
719 assert!(!can_be_pushed_down_impl(&lit_expr, &test_schema));
720 }
721
722 #[rstest]
723 fn test_can_be_pushed_down_binary_supported(test_schema: Schema) {
724 let left = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
725 let right =
726 Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
727 let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right))
728 as Arc<dyn PhysicalExpr>;
729
730 assert!(can_be_pushed_down_impl(&binary_expr, &test_schema));
731 }
732
733 #[rstest]
734 fn test_can_be_pushed_down_binary_unsupported_operator(test_schema: Schema) {
735 let left = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
736 let right =
737 Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
738 let binary_expr = Arc::new(df_expr::BinaryExpr::new(
739 left,
740 DFOperator::AtQuestion,
741 right,
742 )) as Arc<dyn PhysicalExpr>;
743
744 assert!(!can_be_pushed_down_impl(&binary_expr, &test_schema));
745 }
746
747 #[rstest]
748 fn test_can_be_pushed_down_binary_unsupported_operand(test_schema: Schema) {
749 let left = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
750 let right =
751 Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
752 let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right))
753 as Arc<dyn PhysicalExpr>;
754
755 assert!(!can_be_pushed_down_impl(&binary_expr, &test_schema));
756 }
757
758 #[rstest]
759 fn test_can_be_pushed_down_like_supported(test_schema: Schema) {
760 let expr = Arc::new(df_expr::Column::new("name", 1)) as Arc<dyn PhysicalExpr>;
761 let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
762 "test%".to_string(),
763 )))) as Arc<dyn PhysicalExpr>;
764 let like_expr =
765 Arc::new(df_expr::LikeExpr::new(false, false, expr, pattern)) as Arc<dyn PhysicalExpr>;
766
767 assert!(can_be_pushed_down_impl(&like_expr, &test_schema));
768 }
769
770 #[rstest]
771 fn test_can_be_pushed_down_like_unsupported_operand(test_schema: Schema) {
772 let expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc<dyn PhysicalExpr>;
773 let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
774 "test%".to_string(),
775 )))) as Arc<dyn PhysicalExpr>;
776 let like_expr =
777 Arc::new(df_expr::LikeExpr::new(false, false, expr, pattern)) as Arc<dyn PhysicalExpr>;
778
779 assert!(!can_be_pushed_down_impl(&like_expr, &test_schema));
780 }
781
782 #[tokio::test]
784 async fn test_cast_int_to_string() -> anyhow::Result<()> {
785 let ctx = TestSessionContext::default();
786
787 ctx.session
788 .sql(r#"copy (select 1 as id) to 'example.vortex'"#)
789 .await?
790 .show()
791 .await?;
792
793 ctx.session
794 .sql(r#"select cast(id as string) as sid from 'example.vortex' where id > 0"#)
795 .await?
796 .show()
797 .await?;
798
799 ctx.session
800 .sql(r#"select id from 'example.vortex' where cast (id as string) == '1'"#)
801 .await?
802 .show()
803 .await?;
804
805 ctx.session
807 .sql(r#"select cast(id as string) from 'example.vortex'"#)
808 .await?
809 .collect()
810 .await?;
811
812 Ok(())
813 }
814}