1use arrow::{
21 array::{Array, ArrayRef, Date32Array, Date64Array, NullArray},
22 compute::{CastOptions, kernels, max, min},
23 datatypes::{DataType, Field},
24 util::pretty::pretty_format_columns,
25};
26use datafusion_common::internal_datafusion_err;
27use datafusion_common::{
28 Result, ScalarValue,
29 format::DEFAULT_CAST_OPTIONS,
30 internal_err,
31 scalar::{date_to_timestamp_multiplier, ensure_timestamp_in_bounds},
32};
33use std::fmt;
34use std::sync::Arc;
35
36#[derive(Clone, Debug)]
96pub enum ColumnarValue {
97 Array(ArrayRef),
99 Scalar(ScalarValue),
101}
102
103impl From<ArrayRef> for ColumnarValue {
104 fn from(value: ArrayRef) -> Self {
105 ColumnarValue::Array(value)
106 }
107}
108
109impl From<ScalarValue> for ColumnarValue {
110 fn from(value: ScalarValue) -> Self {
111 ColumnarValue::Scalar(value)
112 }
113}
114
115impl ColumnarValue {
116 pub fn data_type(&self) -> DataType {
117 match self {
118 ColumnarValue::Array(array_value) => array_value.data_type().clone(),
119 ColumnarValue::Scalar(scalar_value) => scalar_value.data_type(),
120 }
121 }
122
123 pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
137 Ok(match self {
138 ColumnarValue::Array(array) => array,
139 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
140 })
141 }
142
143 pub fn into_array_of_size(self, num_rows: usize) -> Result<ArrayRef> {
157 match self {
158 ColumnarValue::Array(array) => {
159 if array.len() == num_rows {
160 Ok(array)
161 } else {
162 internal_err!(
163 "Array length {} does not match expected length {}",
164 array.len(),
165 num_rows
166 )
167 }
168 }
169 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
170 }
171 }
172
173 pub fn to_array(&self, num_rows: usize) -> Result<ArrayRef> {
187 Ok(match self {
188 ColumnarValue::Array(array) => Arc::clone(array),
189 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
190 })
191 }
192
193 pub fn to_array_of_size(&self, num_rows: usize) -> Result<ArrayRef> {
207 match self {
208 ColumnarValue::Array(array) => {
209 if array.len() == num_rows {
210 Ok(Arc::clone(array))
211 } else {
212 internal_err!(
213 "Array length {} does not match expected length {}",
214 array.len(),
215 num_rows
216 )
217 }
218 }
219 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
220 }
221 }
222
223 pub fn create_null_array(num_rows: usize) -> Self {
226 ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
227 }
228
229 pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
244 if args.is_empty() {
245 return Ok(vec![]);
246 }
247
248 let mut array_len = None;
249 for arg in args {
250 array_len = match (arg, array_len) {
251 (ColumnarValue::Array(a), None) => Some(a.len()),
252 (ColumnarValue::Array(a), Some(array_len)) => {
253 if array_len == a.len() {
254 Some(array_len)
255 } else {
256 return internal_err!(
257 "Arguments has mixed length. Expected length: {array_len}, found length: {}",
258 a.len()
259 );
260 }
261 }
262 (ColumnarValue::Scalar(_), array_len) => array_len,
263 }
264 }
265
266 let inferred_length = array_len.unwrap_or(1);
268
269 let args = args
270 .iter()
271 .map(|arg| arg.to_array(inferred_length))
272 .collect::<Result<Vec<_>>>()?;
273
274 Ok(args)
275 }
276
277 pub fn cast_to(
289 &self,
290 cast_type: &DataType,
291 cast_options: Option<&CastOptions<'static>>,
292 ) -> Result<ColumnarValue> {
293 let cast_options = cast_options.cloned().unwrap_or(DEFAULT_CAST_OPTIONS);
294 match self {
295 ColumnarValue::Array(array) => {
296 let casted = cast_array_by_name(array, cast_type, &cast_options)?;
297 Ok(ColumnarValue::Array(casted))
298 }
299 ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
300 scalar.cast_to_with_options(cast_type, &cast_options)?,
301 )),
302 }
303 }
304}
305
306fn cast_array_by_name(
307 array: &ArrayRef,
308 cast_type: &DataType,
309 cast_options: &CastOptions<'static>,
310) -> Result<ArrayRef> {
311 if array.data_type() == cast_type {
313 return Ok(Arc::clone(array));
314 }
315
316 match cast_type {
317 DataType::Struct(_) => {
318 let target_field = Field::new("_", cast_type.clone(), true);
320 datafusion_common::nested_struct::cast_column(
321 array,
322 &target_field,
323 cast_options,
324 )
325 }
326 _ => {
327 ensure_date_array_timestamp_bounds(array, cast_type)?;
328 Ok(kernels::cast::cast_with_options(
329 array,
330 cast_type,
331 cast_options,
332 )?)
333 }
334 }
335}
336
337fn ensure_date_array_timestamp_bounds(
338 array: &ArrayRef,
339 cast_type: &DataType,
340) -> Result<()> {
341 let source_type = array.data_type().clone();
342 let Some(multiplier) = date_to_timestamp_multiplier(&source_type, cast_type) else {
343 return Ok(());
344 };
345
346 if multiplier <= 1 {
347 return Ok(());
348 }
349
350 let (min_val, max_val): (Option<i64>, Option<i64>) = match &source_type {
352 DataType::Date32 => {
353 let arr = array
354 .as_any()
355 .downcast_ref::<Date32Array>()
356 .ok_or_else(|| {
357 internal_datafusion_err!(
358 "Expected Date32Array but found {}",
359 array.data_type()
360 )
361 })?;
362 (min(arr).map(|v| v as i64), max(arr).map(|v| v as i64))
363 }
364 DataType::Date64 => {
365 let arr = array
366 .as_any()
367 .downcast_ref::<Date64Array>()
368 .ok_or_else(|| {
369 internal_datafusion_err!(
370 "Expected Date64Array but found {}",
371 array.data_type()
372 )
373 })?;
374 (min(arr), max(arr))
375 }
376 _ => return Ok(()), };
378
379 if let Some(min) = min_val {
381 ensure_timestamp_in_bounds(min, multiplier, &source_type, cast_type)?;
382 }
383 if let Some(max) = max_val {
384 ensure_timestamp_in_bounds(max, multiplier, &source_type, cast_type)?;
385 }
386
387 Ok(())
388}
389
390impl fmt::Display for ColumnarValue {
392 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
393 let formatted = match self {
394 ColumnarValue::Array(array) => {
395 pretty_format_columns("ColumnarValue(ArrayRef)", &[Arc::clone(array)])
396 }
397 ColumnarValue::Scalar(_) => {
398 if let Ok(array) = self.to_array(1) {
399 pretty_format_columns("ColumnarValue(ScalarValue)", &[array])
400 } else {
401 return write!(f, "Error formatting columnar value");
402 }
403 }
404 };
405
406 if let Ok(formatted) = formatted {
407 write!(f, "{formatted}")
408 } else {
409 write!(f, "Error formatting columnar value")
410 }
411 }
412}
413
414#[cfg(test)]
415mod tests {
416 use super::*;
417 use arrow::{
418 array::{Date64Array, Int32Array, StructArray},
419 datatypes::{Field, Fields, TimeUnit},
420 };
421
422 #[test]
423 fn into_array_of_size() {
424 let arr = make_array(1, 3);
426 let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
427 assert_eq!(&arr_columnar_value.into_array_of_size(3).unwrap(), &arr);
428
429 let scalar_columnar_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(42)));
431 let expected_array = make_array(42, 100);
432 assert_eq!(
433 &scalar_columnar_value.into_array_of_size(100).unwrap(),
434 &expected_array
435 );
436
437 let arr = make_array(1, 3);
439 let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
440 let result = arr_columnar_value.into_array_of_size(5);
441 let err = result.unwrap_err();
442 assert!(
443 err.to_string().starts_with(
444 "Internal error: Array length 3 does not match expected length 5"
445 ),
446 "Found: {err}"
447 );
448 }
449
450 #[test]
451 fn values_to_arrays() {
452 let cases = vec![
454 TestCase {
456 input: vec![],
457 expected: vec![],
458 },
459 TestCase {
461 input: vec![ColumnarValue::Array(make_array(1, 3))],
462 expected: vec![make_array(1, 3)],
463 },
464 TestCase {
466 input: vec![
467 ColumnarValue::Array(make_array(1, 3)),
468 ColumnarValue::Array(make_array(2, 3)),
469 ],
470 expected: vec![make_array(1, 3), make_array(2, 3)],
471 },
472 TestCase {
474 input: vec![
475 ColumnarValue::Array(make_array(1, 3)),
476 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
477 ],
478 expected: vec![
479 make_array(1, 3),
480 make_array(100, 3), ],
482 },
483 TestCase {
485 input: vec![
486 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
487 ColumnarValue::Array(make_array(1, 3)),
488 ],
489 expected: vec![
490 make_array(100, 3), make_array(1, 3),
492 ],
493 },
494 TestCase {
496 input: vec![
497 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
498 ColumnarValue::Array(make_array(1, 3)),
499 ColumnarValue::Scalar(ScalarValue::Int32(Some(200))),
500 ],
501 expected: vec![
502 make_array(100, 3), make_array(1, 3),
504 make_array(200, 3), ],
506 },
507 ];
508 for case in cases {
509 case.run();
510 }
511 }
512
513 #[test]
514 #[should_panic(
515 expected = "Arguments has mixed length. Expected length: 3, found length: 4"
516 )]
517 fn values_to_arrays_mixed_length() {
518 ColumnarValue::values_to_arrays(&[
519 ColumnarValue::Array(make_array(1, 3)),
520 ColumnarValue::Array(make_array(2, 4)),
521 ])
522 .unwrap();
523 }
524
525 #[test]
526 #[should_panic(
527 expected = "Arguments has mixed length. Expected length: 3, found length: 7"
528 )]
529 fn values_to_arrays_mixed_length_and_scalar() {
530 ColumnarValue::values_to_arrays(&[
531 ColumnarValue::Array(make_array(1, 3)),
532 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
533 ColumnarValue::Array(make_array(2, 7)),
534 ])
535 .unwrap();
536 }
537
538 struct TestCase {
539 input: Vec<ColumnarValue>,
540 expected: Vec<ArrayRef>,
541 }
542
543 impl TestCase {
544 fn run(self) {
545 let Self { input, expected } = self;
546
547 assert_eq!(
548 ColumnarValue::values_to_arrays(&input).unwrap(),
549 expected,
550 "\ninput: {input:?}\nexpected: {expected:?}"
551 );
552 }
553 }
554
555 fn make_array(val: i32, len: usize) -> ArrayRef {
557 Arc::new(Int32Array::from(vec![val; len]))
558 }
559
560 #[test]
561 fn test_display_scalar() {
562 let column = ColumnarValue::from(ScalarValue::from("foo"));
563 assert_eq!(
564 column.to_string(),
565 concat!(
566 "+----------------------------+\n",
567 "| ColumnarValue(ScalarValue) |\n",
568 "+----------------------------+\n",
569 "| foo |\n",
570 "+----------------------------+"
571 )
572 );
573 }
574
575 #[test]
576 fn test_display_array() {
577 let array: ArrayRef = Arc::new(Int32Array::from_iter_values(vec![1, 2, 3]));
578 let column = ColumnarValue::from(array);
579 assert_eq!(
580 column.to_string(),
581 concat!(
582 "+-------------------------+\n",
583 "| ColumnarValue(ArrayRef) |\n",
584 "+-------------------------+\n",
585 "| 1 |\n",
586 "| 2 |\n",
587 "| 3 |\n",
588 "+-------------------------+"
589 )
590 );
591 }
592
593 #[test]
594 fn cast_struct_by_field_name() {
595 let source_fields = Fields::from(vec![
596 Field::new("b", DataType::Int32, true),
597 Field::new("a", DataType::Int32, true),
598 ]);
599
600 let target_fields = Fields::from(vec![
601 Field::new("a", DataType::Int32, true),
602 Field::new("b", DataType::Int32, true),
603 ]);
604
605 let struct_array = StructArray::new(
606 source_fields,
607 vec![
608 Arc::new(Int32Array::from(vec![Some(3)])),
609 Arc::new(Int32Array::from(vec![Some(4)])),
610 ],
611 None,
612 );
613
614 let value = ColumnarValue::Array(Arc::new(struct_array));
615 let casted = value
616 .cast_to(&DataType::Struct(target_fields.clone()), None)
617 .expect("struct cast should succeed");
618
619 let ColumnarValue::Array(arr) = casted else {
620 panic!("expected array after cast");
621 };
622
623 let struct_array = arr
624 .as_any()
625 .downcast_ref::<StructArray>()
626 .expect("expected StructArray");
627
628 let field_a = struct_array
629 .column_by_name("a")
630 .expect("expected field a in cast result");
631 let field_b = struct_array
632 .column_by_name("b")
633 .expect("expected field b in cast result");
634
635 assert_eq!(
636 field_a
637 .as_any()
638 .downcast_ref::<Int32Array>()
639 .expect("expected Int32 array")
640 .value(0),
641 4
642 );
643 assert_eq!(
644 field_b
645 .as_any()
646 .downcast_ref::<Int32Array>()
647 .expect("expected Int32 array")
648 .value(0),
649 3
650 );
651 }
652
653 #[test]
654 fn cast_struct_missing_field_inserts_nulls() {
655 let source_fields = Fields::from(vec![Field::new("a", DataType::Int32, true)]);
656
657 let target_fields = Fields::from(vec![
658 Field::new("a", DataType::Int32, true),
659 Field::new("b", DataType::Int32, true),
660 ]);
661
662 let struct_array = StructArray::new(
663 source_fields,
664 vec![Arc::new(Int32Array::from(vec![Some(5)]))],
665 None,
666 );
667
668 let value = ColumnarValue::Array(Arc::new(struct_array));
669 let casted = value
670 .cast_to(&DataType::Struct(target_fields.clone()), None)
671 .expect("struct cast should succeed");
672
673 let ColumnarValue::Array(arr) = casted else {
674 panic!("expected array after cast");
675 };
676
677 let struct_array = arr
678 .as_any()
679 .downcast_ref::<StructArray>()
680 .expect("expected StructArray");
681
682 let field_b = struct_array
683 .column_by_name("b")
684 .expect("expected missing field to be added");
685
686 assert!(field_b.is_null(0));
687 }
688
689 #[test]
690 fn cast_date64_array_to_timestamp_overflow() {
691 let overflow_value = i64::MAX / 1_000_000 + 1;
692 let array: ArrayRef = Arc::new(Date64Array::from(vec![Some(overflow_value)]));
693 let value = ColumnarValue::Array(array);
694 let result =
695 value.cast_to(&DataType::Timestamp(TimeUnit::Nanosecond, None), None);
696 let err = result.expect_err("expected overflow to be detected");
697 assert!(
698 err.to_string()
699 .contains("converted value exceeds the representable i64 range"),
700 "unexpected error: {err}"
701 );
702 }
703}