1use std::any::Any;
19use std::fmt;
20use std::hash::Hash;
21use std::sync::Arc;
22
23use crate::physical_expr::PhysicalExpr;
24
25use arrow::compute::{can_cast_types, CastOptions};
26use arrow::datatypes::{DataType, DataType::*, FieldRef, Schema};
27use arrow::record_batch::RecordBatch;
28use datafusion_common::format::DEFAULT_FORMAT_OPTIONS;
29use datafusion_common::{not_impl_err, Result};
30use datafusion_expr_common::columnar_value::ColumnarValue;
31use datafusion_expr_common::interval_arithmetic::Interval;
32use datafusion_expr_common::sort_properties::ExprProperties;
33
34const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
35 safe: false,
36 format_options: DEFAULT_FORMAT_OPTIONS,
37};
38
39const DEFAULT_SAFE_CAST_OPTIONS: CastOptions<'static> = CastOptions {
40 safe: true,
41 format_options: DEFAULT_FORMAT_OPTIONS,
42};
43
44#[derive(Debug, Clone, Eq)]
46pub struct CastExpr {
47 pub expr: Arc<dyn PhysicalExpr>,
49 cast_type: DataType,
51 cast_options: CastOptions<'static>,
53}
54
55impl PartialEq for CastExpr {
57 fn eq(&self, other: &Self) -> bool {
58 self.expr.eq(&other.expr)
59 && self.cast_type.eq(&other.cast_type)
60 && self.cast_options.eq(&other.cast_options)
61 }
62}
63
64impl Hash for CastExpr {
65 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
66 self.expr.hash(state);
67 self.cast_type.hash(state);
68 self.cast_options.hash(state);
69 }
70}
71
72impl CastExpr {
73 pub fn new(
75 expr: Arc<dyn PhysicalExpr>,
76 cast_type: DataType,
77 cast_options: Option<CastOptions<'static>>,
78 ) -> Self {
79 Self {
80 expr,
81 cast_type,
82 cast_options: cast_options.unwrap_or(DEFAULT_CAST_OPTIONS),
83 }
84 }
85
86 pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
88 &self.expr
89 }
90
91 pub fn cast_type(&self) -> &DataType {
93 &self.cast_type
94 }
95
96 pub fn cast_options(&self) -> &CastOptions<'static> {
98 &self.cast_options
99 }
100
101 pub fn is_bigger_cast(&self, src: &DataType) -> bool {
103 if self.cast_type.eq(src) {
104 return true;
105 }
106 matches!(
107 (src, &self.cast_type),
108 (Int8, Int16 | Int32 | Int64)
109 | (Int16, Int32 | Int64)
110 | (Int32, Int64)
111 | (UInt8, UInt16 | UInt32 | UInt64)
112 | (UInt16, UInt32 | UInt64)
113 | (UInt32, UInt64)
114 | (
115 Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32,
116 Float32 | Float64
117 )
118 | (Int64 | UInt64, Float64)
119 | (Utf8, LargeUtf8)
120 )
121 }
122}
123
124impl fmt::Display for CastExpr {
125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126 write!(f, "CAST({} AS {:?})", self.expr, self.cast_type)
127 }
128}
129
130impl PhysicalExpr for CastExpr {
131 fn as_any(&self) -> &dyn Any {
133 self
134 }
135
136 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
137 Ok(self.cast_type.clone())
138 }
139
140 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
141 self.expr.nullable(input_schema)
142 }
143
144 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
145 let value = self.expr.evaluate(batch)?;
146 value.cast_to(&self.cast_type, Some(&self.cast_options))
147 }
148
149 fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
150 Ok(self
151 .expr
152 .return_field(input_schema)?
153 .as_ref()
154 .clone()
155 .with_data_type(self.cast_type.clone())
156 .into())
157 }
158
159 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
160 vec![&self.expr]
161 }
162
163 fn with_new_children(
164 self: Arc<Self>,
165 children: Vec<Arc<dyn PhysicalExpr>>,
166 ) -> Result<Arc<dyn PhysicalExpr>> {
167 Ok(Arc::new(CastExpr::new(
168 Arc::clone(&children[0]),
169 self.cast_type.clone(),
170 Some(self.cast_options.clone()),
171 )))
172 }
173
174 fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
175 children[0].cast_to(&self.cast_type, &self.cast_options)
177 }
178
179 fn propagate_constraints(
180 &self,
181 interval: &Interval,
182 children: &[&Interval],
183 ) -> Result<Option<Vec<Interval>>> {
184 let child_interval = children[0];
185 let cast_type = child_interval.data_type();
187 Ok(Some(vec![
188 interval.cast_to(&cast_type, &DEFAULT_SAFE_CAST_OPTIONS)?
189 ]))
190 }
191
192 fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
195 let source_datatype = children[0].range.data_type();
196 let target_type = &self.cast_type;
197
198 let unbounded = Interval::make_unbounded(target_type)?;
199 if (source_datatype.is_numeric() || source_datatype == Boolean)
200 && target_type.is_numeric()
201 || source_datatype.is_temporal() && target_type.is_temporal()
202 || source_datatype.eq(target_type)
203 {
204 Ok(children[0].clone().with_range(unbounded))
205 } else {
206 Ok(ExprProperties::new_unknown().with_range(unbounded))
207 }
208 }
209
210 fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 write!(f, "CAST(")?;
212 self.expr.fmt_sql(f)?;
213 write!(f, " AS {:?}", self.cast_type)?;
214
215 write!(f, ")")
216 }
217}
218
219pub fn cast_with_options(
224 expr: Arc<dyn PhysicalExpr>,
225 input_schema: &Schema,
226 cast_type: DataType,
227 cast_options: Option<CastOptions<'static>>,
228) -> Result<Arc<dyn PhysicalExpr>> {
229 let expr_type = expr.data_type(input_schema)?;
230 if expr_type == cast_type {
231 Ok(Arc::clone(&expr))
232 } else if can_cast_types(&expr_type, &cast_type) {
233 Ok(Arc::new(CastExpr::new(expr, cast_type, cast_options)))
234 } else {
235 not_impl_err!("Unsupported CAST from {expr_type} to {cast_type}")
236 }
237}
238
239pub fn cast(
244 expr: Arc<dyn PhysicalExpr>,
245 input_schema: &Schema,
246 cast_type: DataType,
247) -> Result<Arc<dyn PhysicalExpr>> {
248 cast_with_options(expr, input_schema, cast_type, None)
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254
255 use crate::expressions::column::col;
256
257 use arrow::{
258 array::{
259 Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array,
260 Int64Array, Int8Array, StringArray, Time64NanosecondArray,
261 TimestampNanosecondArray, UInt32Array,
262 },
263 datatypes::*,
264 };
265 use datafusion_physical_expr_common::physical_expr::fmt_sql;
266 use insta::assert_snapshot;
267
268 macro_rules! generic_decimal_to_other_test_cast {
275 ($DECIMAL_ARRAY:ident, $A_TYPE:expr, $TYPEARRAY:ident, $TYPE:expr, $VEC:expr,$CAST_OPTIONS:expr) => {{
276 let schema = Schema::new(vec![Field::new("a", $A_TYPE, true)]);
277 let batch = RecordBatch::try_new(
278 Arc::new(schema.clone()),
279 vec![Arc::new($DECIMAL_ARRAY)],
280 )?;
281 let expression =
283 cast_with_options(col("a", &schema)?, &schema, $TYPE, $CAST_OPTIONS)?;
284
285 assert_eq!(
287 format!("CAST(a@0 AS {:?})", $TYPE),
288 format!("{}", expression)
289 );
290
291 assert_eq!(expression.data_type(&schema)?, $TYPE);
293
294 let result = expression
296 .evaluate(&batch)?
297 .into_array(batch.num_rows())
298 .expect("Failed to convert to array");
299
300 assert_eq!(*result.data_type(), $TYPE);
302
303 let result = result
305 .as_any()
306 .downcast_ref::<$TYPEARRAY>()
307 .expect("failed to downcast");
308
309 for (i, x) in $VEC.iter().enumerate() {
311 match x {
312 Some(x) => assert_eq!(result.value(i), *x),
313 None => assert!(!result.is_valid(i)),
314 }
315 }
316 }};
317 }
318
319 macro_rules! generic_test_cast {
326 ($A_ARRAY:ident, $A_TYPE:expr, $A_VEC:expr, $TYPEARRAY:ident, $TYPE:expr, $VEC:expr, $CAST_OPTIONS:expr) => {{
327 let schema = Schema::new(vec![Field::new("a", $A_TYPE, true)]);
328 let a_vec_len = $A_VEC.len();
329 let a = $A_ARRAY::from($A_VEC);
330 let batch =
331 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
332
333 let expression =
335 cast_with_options(col("a", &schema)?, &schema, $TYPE, $CAST_OPTIONS)?;
336
337 assert_eq!(
339 format!("CAST(a@0 AS {:?})", $TYPE),
340 format!("{}", expression)
341 );
342
343 assert_eq!(expression.data_type(&schema)?, $TYPE);
345
346 let result = expression
348 .evaluate(&batch)?
349 .into_array(batch.num_rows())
350 .expect("Failed to convert to array");
351
352 assert_eq!(*result.data_type(), $TYPE);
354
355 assert_eq!(result.len(), a_vec_len);
357
358 let result = result
360 .as_any()
361 .downcast_ref::<$TYPEARRAY>()
362 .expect("failed to downcast");
363
364 for (i, x) in $VEC.iter().enumerate() {
366 match x {
367 Some(x) => assert_eq!(result.value(i), *x),
368 None => assert!(!result.is_valid(i)),
369 }
370 }
371 }};
372 }
373
374 #[test]
375 fn test_cast_decimal_to_decimal() -> Result<()> {
376 let array = vec![
377 Some(1234),
378 Some(2222),
379 Some(3),
380 Some(4000),
381 Some(5000),
382 None,
383 ];
384
385 let decimal_array = array
386 .clone()
387 .into_iter()
388 .collect::<Decimal128Array>()
389 .with_precision_and_scale(10, 3)?;
390
391 generic_decimal_to_other_test_cast!(
392 decimal_array,
393 Decimal128(10, 3),
394 Decimal128Array,
395 Decimal128(20, 6),
396 [
397 Some(1_234_000),
398 Some(2_222_000),
399 Some(3_000),
400 Some(4_000_000),
401 Some(5_000_000),
402 None
403 ],
404 None
405 );
406
407 let decimal_array = array
408 .into_iter()
409 .collect::<Decimal128Array>()
410 .with_precision_and_scale(10, 3)?;
411
412 generic_decimal_to_other_test_cast!(
413 decimal_array,
414 Decimal128(10, 3),
415 Decimal128Array,
416 Decimal128(10, 2),
417 [Some(123), Some(222), Some(0), Some(400), Some(500), None],
418 None
419 );
420
421 Ok(())
422 }
423
424 #[test]
425 fn test_cast_decimal_to_decimal_overflow() -> Result<()> {
426 let array = vec![Some(123456789)];
427
428 let decimal_array = array
429 .clone()
430 .into_iter()
431 .collect::<Decimal128Array>()
432 .with_precision_and_scale(10, 3)?;
433
434 let schema = Schema::new(vec![Field::new("a", Decimal128(10, 3), false)]);
435 let batch = RecordBatch::try_new(
436 Arc::new(schema.clone()),
437 vec![Arc::new(decimal_array)],
438 )?;
439 let expression =
440 cast_with_options(col("a", &schema)?, &schema, Decimal128(6, 2), None)?;
441 let e = expression.evaluate(&batch).unwrap_err().strip_backtrace(); assert_snapshot!(e, @"Arrow error: Invalid argument error: 123456.79 is too large to store in a Decimal128 of precision 6. Max is 9999.99");
443 let expression_safe = cast_with_options(
445 col("a", &schema)?,
446 &schema,
447 Decimal128(6, 2),
448 Some(DEFAULT_SAFE_CAST_OPTIONS),
449 )?;
450 let result_safe = expression_safe
451 .evaluate(&batch)?
452 .into_array(batch.num_rows())
453 .expect("failed to convert to array");
454
455 assert!(result_safe.is_null(0));
456
457 Ok(())
458 }
459
460 #[test]
461 fn test_cast_decimal_to_numeric() -> Result<()> {
462 let array = vec![Some(1), Some(2), Some(3), Some(4), Some(5), None];
463 let decimal_array = array
465 .clone()
466 .into_iter()
467 .collect::<Decimal128Array>()
468 .with_precision_and_scale(10, 0)?;
469 generic_decimal_to_other_test_cast!(
470 decimal_array,
471 Decimal128(10, 0),
472 Int8Array,
473 Int8,
474 [
475 Some(1_i8),
476 Some(2_i8),
477 Some(3_i8),
478 Some(4_i8),
479 Some(5_i8),
480 None
481 ],
482 None
483 );
484
485 let decimal_array = array
487 .clone()
488 .into_iter()
489 .collect::<Decimal128Array>()
490 .with_precision_and_scale(10, 0)?;
491 generic_decimal_to_other_test_cast!(
492 decimal_array,
493 Decimal128(10, 0),
494 Int16Array,
495 Int16,
496 [
497 Some(1_i16),
498 Some(2_i16),
499 Some(3_i16),
500 Some(4_i16),
501 Some(5_i16),
502 None
503 ],
504 None
505 );
506
507 let decimal_array = array
509 .clone()
510 .into_iter()
511 .collect::<Decimal128Array>()
512 .with_precision_and_scale(10, 0)?;
513 generic_decimal_to_other_test_cast!(
514 decimal_array,
515 Decimal128(10, 0),
516 Int32Array,
517 Int32,
518 [
519 Some(1_i32),
520 Some(2_i32),
521 Some(3_i32),
522 Some(4_i32),
523 Some(5_i32),
524 None
525 ],
526 None
527 );
528
529 let decimal_array = array
531 .into_iter()
532 .collect::<Decimal128Array>()
533 .with_precision_and_scale(10, 0)?;
534 generic_decimal_to_other_test_cast!(
535 decimal_array,
536 Decimal128(10, 0),
537 Int64Array,
538 Int64,
539 [
540 Some(1_i64),
541 Some(2_i64),
542 Some(3_i64),
543 Some(4_i64),
544 Some(5_i64),
545 None
546 ],
547 None
548 );
549
550 let array = vec![
552 Some(1234),
553 Some(2222),
554 Some(3),
555 Some(4000),
556 Some(5000),
557 None,
558 ];
559 let decimal_array = array
560 .clone()
561 .into_iter()
562 .collect::<Decimal128Array>()
563 .with_precision_and_scale(10, 3)?;
564 generic_decimal_to_other_test_cast!(
565 decimal_array,
566 Decimal128(10, 3),
567 Float32Array,
568 Float32,
569 [
570 Some(1.234_f32),
571 Some(2.222_f32),
572 Some(0.003_f32),
573 Some(4.0_f32),
574 Some(5.0_f32),
575 None
576 ],
577 None
578 );
579
580 let decimal_array = array
582 .into_iter()
583 .collect::<Decimal128Array>()
584 .with_precision_and_scale(20, 6)?;
585 generic_decimal_to_other_test_cast!(
586 decimal_array,
587 Decimal128(20, 6),
588 Float64Array,
589 Float64,
590 [
591 Some(0.001234_f64),
592 Some(0.002222_f64),
593 Some(0.000003_f64),
594 Some(0.004_f64),
595 Some(0.005_f64),
596 None
597 ],
598 None
599 );
600 Ok(())
601 }
602
603 #[test]
604 fn test_cast_numeric_to_decimal() -> Result<()> {
605 generic_test_cast!(
607 Int8Array,
608 Int8,
609 vec![1, 2, 3, 4, 5],
610 Decimal128Array,
611 Decimal128(3, 0),
612 [Some(1), Some(2), Some(3), Some(4), Some(5)],
613 None
614 );
615
616 generic_test_cast!(
618 Int16Array,
619 Int16,
620 vec![1, 2, 3, 4, 5],
621 Decimal128Array,
622 Decimal128(5, 0),
623 [Some(1), Some(2), Some(3), Some(4), Some(5)],
624 None
625 );
626
627 generic_test_cast!(
629 Int32Array,
630 Int32,
631 vec![1, 2, 3, 4, 5],
632 Decimal128Array,
633 Decimal128(10, 0),
634 [Some(1), Some(2), Some(3), Some(4), Some(5)],
635 None
636 );
637
638 generic_test_cast!(
640 Int64Array,
641 Int64,
642 vec![1, 2, 3, 4, 5],
643 Decimal128Array,
644 Decimal128(20, 0),
645 [Some(1), Some(2), Some(3), Some(4), Some(5)],
646 None
647 );
648
649 generic_test_cast!(
651 Int64Array,
652 Int64,
653 vec![1, 2, 3, 4, 5],
654 Decimal128Array,
655 Decimal128(20, 2),
656 [Some(100), Some(200), Some(300), Some(400), Some(500)],
657 None
658 );
659
660 generic_test_cast!(
662 Float32Array,
663 Float32,
664 vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50],
665 Decimal128Array,
666 Decimal128(10, 2),
667 [Some(150), Some(250), Some(300), Some(112), Some(550)],
668 None
669 );
670
671 generic_test_cast!(
673 Float64Array,
674 Float64,
675 vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50],
676 Decimal128Array,
677 Decimal128(20, 4),
678 [
679 Some(15000),
680 Some(25000),
681 Some(30000),
682 Some(11235),
683 Some(55000)
684 ],
685 None
686 );
687 Ok(())
688 }
689
690 #[test]
691 fn test_cast_i32_u32() -> Result<()> {
692 generic_test_cast!(
693 Int32Array,
694 Int32,
695 vec![1, 2, 3, 4, 5],
696 UInt32Array,
697 UInt32,
698 [
699 Some(1_u32),
700 Some(2_u32),
701 Some(3_u32),
702 Some(4_u32),
703 Some(5_u32)
704 ],
705 None
706 );
707 Ok(())
708 }
709
710 #[test]
711 fn test_cast_i32_utf8() -> Result<()> {
712 generic_test_cast!(
713 Int32Array,
714 Int32,
715 vec![1, 2, 3, 4, 5],
716 StringArray,
717 Utf8,
718 [Some("1"), Some("2"), Some("3"), Some("4"), Some("5")],
719 None
720 );
721 Ok(())
722 }
723
724 #[test]
725 fn test_cast_i64_t64() -> Result<()> {
726 let original = vec![1, 2, 3, 4, 5];
727 let expected: Vec<Option<i64>> = original
728 .iter()
729 .map(|i| Some(Time64NanosecondArray::from(vec![*i]).value(0)))
730 .collect();
731 generic_test_cast!(
732 Int64Array,
733 Int64,
734 original,
735 TimestampNanosecondArray,
736 Timestamp(TimeUnit::Nanosecond, None),
737 expected,
738 None
739 );
740 Ok(())
741 }
742
743 #[test]
744 fn invalid_cast() {
745 let schema = Schema::new(vec![Field::new("a", Int32, false)]);
747
748 let result = cast(
749 col("a", &schema).unwrap(),
750 &schema,
751 Interval(IntervalUnit::MonthDayNano),
752 );
753 result.expect_err("expected Invalid CAST");
754 }
755
756 #[test]
757 fn invalid_cast_with_options_error() -> Result<()> {
758 let schema = Schema::new(vec![Field::new("a", Utf8, false)]);
760 let a = StringArray::from(vec!["9.1"]);
761 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
762 let expression = cast_with_options(col("a", &schema)?, &schema, Int32, None)?;
763 let result = expression.evaluate(&batch);
764
765 match result {
766 Ok(_) => panic!("expected error"),
767 Err(e) => {
768 assert!(e
769 .to_string()
770 .contains("Cannot cast string '9.1' to value of Int32 type"))
771 }
772 }
773 Ok(())
774 }
775
776 #[test]
777 #[ignore] fn test_cast_decimal() -> Result<()> {
779 let schema = Schema::new(vec![Field::new("a", Int64, false)]);
780 let a = Int64Array::from(vec![100]);
781 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
782 let expression =
783 cast_with_options(col("a", &schema)?, &schema, Decimal128(38, 38), None)?;
784 expression.evaluate(&batch)?;
785 Ok(())
786 }
787
788 #[test]
789 fn test_fmt_sql() -> Result<()> {
790 let schema = Schema::new(vec![Field::new("a", Int32, true)]);
791
792 let expr = cast(col("a", &schema)?, &schema, Int64)?;
794 let display_string = expr.to_string();
795 assert_eq!(display_string, "CAST(a@0 AS Int64)");
796 let sql_string = fmt_sql(expr.as_ref()).to_string();
797 assert_eq!(sql_string, "CAST(a AS Int64)");
798
799 let schema = Schema::new(vec![Field::new("b", Utf8, true)]);
801 let expr = cast(col("b", &schema)?, &schema, Int32)?;
802 let display_string = expr.to_string();
803 assert_eq!(display_string, "CAST(b@0 AS Int32)");
804 let sql_string = fmt_sql(expr.as_ref()).to_string();
805 assert_eq!(sql_string, "CAST(b AS Int32)");
806
807 Ok(())
808 }
809}