1use arrow::{
21 array::{Array, ArrayRef, Date32Array, Date64Array, NullArray},
22 compute::{CastOptions, kernels, max, min},
23 datatypes::DataType,
24 util::pretty::pretty_format_columns,
25};
26use datafusion_common::internal_datafusion_err;
27use datafusion_common::{
28 Result, ScalarValue,
29 format::DEFAULT_CAST_OPTIONS,
30 internal_err,
31 scalar::{date_to_timestamp_multiplier, ensure_timestamp_in_bounds},
32};
33use std::fmt;
34use std::sync::Arc;
35
36#[derive(Clone, Debug)]
96pub enum ColumnarValue {
97 Array(ArrayRef),
99 Scalar(ScalarValue),
101}
102
103impl From<ArrayRef> for ColumnarValue {
104 fn from(value: ArrayRef) -> Self {
105 ColumnarValue::Array(value)
106 }
107}
108
109impl From<ScalarValue> for ColumnarValue {
110 fn from(value: ScalarValue) -> Self {
111 ColumnarValue::Scalar(value)
112 }
113}
114
115impl ColumnarValue {
116 pub fn data_type(&self) -> DataType {
117 match self {
118 ColumnarValue::Array(array_value) => array_value.data_type().clone(),
119 ColumnarValue::Scalar(scalar_value) => scalar_value.data_type(),
120 }
121 }
122
123 pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
137 Ok(match self {
138 ColumnarValue::Array(array) => array,
139 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
140 })
141 }
142
143 pub fn into_array_of_size(self, num_rows: usize) -> Result<ArrayRef> {
157 match self {
158 ColumnarValue::Array(array) => {
159 if array.len() == num_rows {
160 Ok(array)
161 } else {
162 internal_err!(
163 "Array length {} does not match expected length {}",
164 array.len(),
165 num_rows
166 )
167 }
168 }
169 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
170 }
171 }
172
173 pub fn to_array(&self, num_rows: usize) -> Result<ArrayRef> {
187 Ok(match self {
188 ColumnarValue::Array(array) => Arc::clone(array),
189 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
190 })
191 }
192
193 pub fn to_array_of_size(&self, num_rows: usize) -> Result<ArrayRef> {
207 match self {
208 ColumnarValue::Array(array) => {
209 if array.len() == num_rows {
210 Ok(Arc::clone(array))
211 } else {
212 internal_err!(
213 "Array length {} does not match expected length {}",
214 array.len(),
215 num_rows
216 )
217 }
218 }
219 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
220 }
221 }
222
223 pub fn create_null_array(num_rows: usize) -> Self {
226 ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
227 }
228
229 pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
244 if args.is_empty() {
245 return Ok(vec![]);
246 }
247
248 let mut array_len = None;
249 for arg in args {
250 array_len = match (arg, array_len) {
251 (ColumnarValue::Array(a), None) => Some(a.len()),
252 (ColumnarValue::Array(a), Some(array_len)) => {
253 if array_len == a.len() {
254 Some(array_len)
255 } else {
256 return internal_err!(
257 "Arguments has mixed length. Expected length: {array_len}, found length: {}",
258 a.len()
259 );
260 }
261 }
262 (ColumnarValue::Scalar(_), array_len) => array_len,
263 }
264 }
265
266 let inferred_length = array_len.unwrap_or(1);
268
269 let args = args
270 .iter()
271 .map(|arg| arg.to_array(inferred_length))
272 .collect::<Result<Vec<_>>>()?;
273
274 Ok(args)
275 }
276
277 pub fn cast_to(
289 &self,
290 cast_type: &DataType,
291 cast_options: Option<&CastOptions<'static>>,
292 ) -> Result<ColumnarValue> {
293 let cast_options = cast_options.cloned().unwrap_or(DEFAULT_CAST_OPTIONS);
294 match self {
295 ColumnarValue::Array(array) => {
296 let casted = cast_array_by_name(array, cast_type, &cast_options)?;
297 Ok(ColumnarValue::Array(casted))
298 }
299 ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
300 scalar.cast_to_with_options(cast_type, &cast_options)?,
301 )),
302 }
303 }
304}
305
306fn cast_array_by_name(
307 array: &ArrayRef,
308 cast_type: &DataType,
309 cast_options: &CastOptions<'static>,
310) -> Result<ArrayRef> {
311 if array.data_type() == cast_type {
313 return Ok(Arc::clone(array));
314 }
315
316 if datafusion_common::nested_struct::requires_nested_struct_cast(
317 array.data_type(),
318 cast_type,
319 ) {
320 datafusion_common::nested_struct::cast_column(array, cast_type, cast_options)
321 } else {
322 ensure_date_array_timestamp_bounds(array, cast_type)?;
323 Ok(kernels::cast::cast_with_options(
324 array,
325 cast_type,
326 cast_options,
327 )?)
328 }
329}
330
331fn ensure_date_array_timestamp_bounds(
332 array: &ArrayRef,
333 cast_type: &DataType,
334) -> Result<()> {
335 let source_type = array.data_type().clone();
336 let Some(multiplier) = date_to_timestamp_multiplier(&source_type, cast_type) else {
337 return Ok(());
338 };
339
340 if multiplier <= 1 {
341 return Ok(());
342 }
343
344 let (min_val, max_val): (Option<i64>, Option<i64>) = match &source_type {
346 DataType::Date32 => {
347 let arr = array
348 .as_any()
349 .downcast_ref::<Date32Array>()
350 .ok_or_else(|| {
351 internal_datafusion_err!(
352 "Expected Date32Array but found {}",
353 array.data_type()
354 )
355 })?;
356 (min(arr).map(|v| v as i64), max(arr).map(|v| v as i64))
357 }
358 DataType::Date64 => {
359 let arr = array
360 .as_any()
361 .downcast_ref::<Date64Array>()
362 .ok_or_else(|| {
363 internal_datafusion_err!(
364 "Expected Date64Array but found {}",
365 array.data_type()
366 )
367 })?;
368 (min(arr), max(arr))
369 }
370 _ => return Ok(()), };
372
373 if let Some(min) = min_val {
375 ensure_timestamp_in_bounds(min, multiplier, &source_type, cast_type)?;
376 }
377 if let Some(max) = max_val {
378 ensure_timestamp_in_bounds(max, multiplier, &source_type, cast_type)?;
379 }
380
381 Ok(())
382}
383
384impl fmt::Display for ColumnarValue {
386 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
387 let formatted = match self {
388 ColumnarValue::Array(array) => {
389 pretty_format_columns("ColumnarValue(ArrayRef)", &[Arc::clone(array)])
390 }
391 ColumnarValue::Scalar(_) => {
392 if let Ok(array) = self.to_array(1) {
393 pretty_format_columns("ColumnarValue(ScalarValue)", &[array])
394 } else {
395 return write!(f, "Error formatting columnar value");
396 }
397 }
398 };
399
400 if let Ok(formatted) = formatted {
401 write!(f, "{formatted}")
402 } else {
403 write!(f, "Error formatting columnar value")
404 }
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use arrow::{
412 array::{Date64Array, Int32Array, StructArray},
413 datatypes::{Field, Fields, TimeUnit},
414 };
415
416 #[test]
417 fn into_array_of_size() {
418 let arr = make_array(1, 3);
420 let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
421 assert_eq!(&arr_columnar_value.into_array_of_size(3).unwrap(), &arr);
422
423 let scalar_columnar_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(42)));
425 let expected_array = make_array(42, 100);
426 assert_eq!(
427 &scalar_columnar_value.into_array_of_size(100).unwrap(),
428 &expected_array
429 );
430
431 let arr = make_array(1, 3);
433 let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
434 let result = arr_columnar_value.into_array_of_size(5);
435 let err = result.unwrap_err();
436 assert!(
437 err.to_string().starts_with(
438 "Internal error: Array length 3 does not match expected length 5"
439 ),
440 "Found: {err}"
441 );
442 }
443
444 #[test]
445 fn values_to_arrays() {
446 let cases = vec![
448 TestCase {
450 input: vec![],
451 expected: vec![],
452 },
453 TestCase {
455 input: vec![ColumnarValue::Array(make_array(1, 3))],
456 expected: vec![make_array(1, 3)],
457 },
458 TestCase {
460 input: vec![
461 ColumnarValue::Array(make_array(1, 3)),
462 ColumnarValue::Array(make_array(2, 3)),
463 ],
464 expected: vec![make_array(1, 3), make_array(2, 3)],
465 },
466 TestCase {
468 input: vec![
469 ColumnarValue::Array(make_array(1, 3)),
470 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
471 ],
472 expected: vec![
473 make_array(1, 3),
474 make_array(100, 3), ],
476 },
477 TestCase {
479 input: vec![
480 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
481 ColumnarValue::Array(make_array(1, 3)),
482 ],
483 expected: vec![
484 make_array(100, 3), make_array(1, 3),
486 ],
487 },
488 TestCase {
490 input: vec![
491 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
492 ColumnarValue::Array(make_array(1, 3)),
493 ColumnarValue::Scalar(ScalarValue::Int32(Some(200))),
494 ],
495 expected: vec![
496 make_array(100, 3), make_array(1, 3),
498 make_array(200, 3), ],
500 },
501 ];
502 for case in cases {
503 case.run();
504 }
505 }
506
507 #[test]
508 #[should_panic(
509 expected = "Arguments has mixed length. Expected length: 3, found length: 4"
510 )]
511 fn values_to_arrays_mixed_length() {
512 ColumnarValue::values_to_arrays(&[
513 ColumnarValue::Array(make_array(1, 3)),
514 ColumnarValue::Array(make_array(2, 4)),
515 ])
516 .unwrap();
517 }
518
519 #[test]
520 #[should_panic(
521 expected = "Arguments has mixed length. Expected length: 3, found length: 7"
522 )]
523 fn values_to_arrays_mixed_length_and_scalar() {
524 ColumnarValue::values_to_arrays(&[
525 ColumnarValue::Array(make_array(1, 3)),
526 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
527 ColumnarValue::Array(make_array(2, 7)),
528 ])
529 .unwrap();
530 }
531
532 struct TestCase {
533 input: Vec<ColumnarValue>,
534 expected: Vec<ArrayRef>,
535 }
536
537 impl TestCase {
538 fn run(self) {
539 let Self { input, expected } = self;
540
541 assert_eq!(
542 ColumnarValue::values_to_arrays(&input).unwrap(),
543 expected,
544 "\ninput: {input:?}\nexpected: {expected:?}"
545 );
546 }
547 }
548
549 fn make_array(val: i32, len: usize) -> ArrayRef {
551 Arc::new(Int32Array::from(vec![val; len]))
552 }
553
554 #[test]
555 fn test_display_scalar() {
556 let column = ColumnarValue::from(ScalarValue::from("foo"));
557 assert_eq!(
558 column.to_string(),
559 concat!(
560 "+----------------------------+\n",
561 "| ColumnarValue(ScalarValue) |\n",
562 "+----------------------------+\n",
563 "| foo |\n",
564 "+----------------------------+"
565 )
566 );
567 }
568
569 #[test]
570 fn test_display_array() {
571 let array: ArrayRef = Arc::new(Int32Array::from_iter_values(vec![1, 2, 3]));
572 let column = ColumnarValue::from(array);
573 assert_eq!(
574 column.to_string(),
575 concat!(
576 "+-------------------------+\n",
577 "| ColumnarValue(ArrayRef) |\n",
578 "+-------------------------+\n",
579 "| 1 |\n",
580 "| 2 |\n",
581 "| 3 |\n",
582 "+-------------------------+"
583 )
584 );
585 }
586
587 #[test]
588 fn cast_struct_by_field_name() {
589 let source_fields = Fields::from(vec![
590 Field::new("b", DataType::Int32, true),
591 Field::new("a", DataType::Int32, true),
592 ]);
593
594 let target_fields = Fields::from(vec![
595 Field::new("a", DataType::Int32, true),
596 Field::new("b", DataType::Int32, true),
597 ]);
598
599 let struct_array = StructArray::new(
600 source_fields,
601 vec![
602 Arc::new(Int32Array::from(vec![Some(3)])),
603 Arc::new(Int32Array::from(vec![Some(4)])),
604 ],
605 None,
606 );
607
608 let value = ColumnarValue::Array(Arc::new(struct_array));
609 let casted = value
610 .cast_to(&DataType::Struct(target_fields.clone()), None)
611 .expect("struct cast should succeed");
612
613 let ColumnarValue::Array(arr) = casted else {
614 panic!("expected array after cast");
615 };
616
617 let struct_array = arr
618 .as_any()
619 .downcast_ref::<StructArray>()
620 .expect("expected StructArray");
621
622 let field_a = struct_array
623 .column_by_name("a")
624 .expect("expected field a in cast result");
625 let field_b = struct_array
626 .column_by_name("b")
627 .expect("expected field b in cast result");
628
629 assert_eq!(
630 field_a
631 .as_any()
632 .downcast_ref::<Int32Array>()
633 .expect("expected Int32 array")
634 .value(0),
635 4
636 );
637 assert_eq!(
638 field_b
639 .as_any()
640 .downcast_ref::<Int32Array>()
641 .expect("expected Int32 array")
642 .value(0),
643 3
644 );
645 }
646
647 #[test]
648 fn cast_struct_missing_field_inserts_nulls() {
649 let source_fields = Fields::from(vec![Field::new("a", DataType::Int32, true)]);
650
651 let target_fields = Fields::from(vec![
652 Field::new("a", DataType::Int32, true),
653 Field::new("b", DataType::Int32, true),
654 ]);
655
656 let struct_array = StructArray::new(
657 source_fields,
658 vec![Arc::new(Int32Array::from(vec![Some(5)]))],
659 None,
660 );
661
662 let value = ColumnarValue::Array(Arc::new(struct_array));
663 let casted = value
664 .cast_to(&DataType::Struct(target_fields.clone()), None)
665 .expect("struct cast should succeed");
666
667 let ColumnarValue::Array(arr) = casted else {
668 panic!("expected array after cast");
669 };
670
671 let struct_array = arr
672 .as_any()
673 .downcast_ref::<StructArray>()
674 .expect("expected StructArray");
675
676 let field_b = struct_array
677 .column_by_name("b")
678 .expect("expected missing field to be added");
679
680 assert!(field_b.is_null(0));
681 }
682
683 #[test]
684 fn cast_date64_array_to_timestamp_overflow() {
685 let overflow_value = i64::MAX / 1_000_000 + 1;
686 let array: ArrayRef = Arc::new(Date64Array::from(vec![Some(overflow_value)]));
687 let value = ColumnarValue::Array(array);
688 let result =
689 value.cast_to(&DataType::Timestamp(TimeUnit::Nanosecond, None), None);
690 let err = result.expect_err("expected overflow to be detected");
691 assert!(
692 err.to_string()
693 .contains("converted value exceeds the representable i64 range"),
694 "unexpected error: {err}"
695 );
696 }
697}