Skip to main content

datafusion_expr_common/
columnar_value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ColumnarValue`] represents the result of evaluating an expression.
19
20use arrow::{
21    array::{Array, ArrayRef, Date32Array, Date64Array, NullArray},
22    compute::{CastOptions, kernels, max, min},
23    datatypes::{DataType, Field},
24    util::pretty::pretty_format_columns,
25};
26use datafusion_common::internal_datafusion_err;
27use datafusion_common::{
28    Result, ScalarValue,
29    format::DEFAULT_CAST_OPTIONS,
30    internal_err,
31    scalar::{date_to_timestamp_multiplier, ensure_timestamp_in_bounds},
32};
33use std::fmt;
34use std::sync::Arc;
35
36/// The result of evaluating an expression.
37///
38/// [`ColumnarValue::Scalar`] represents a single value repeated any number of
39/// times. This is an important performance optimization for handling values
40/// that do not change across rows.
41///
42/// [`ColumnarValue::Array`] represents a column of data, stored as an  Arrow
43/// [`ArrayRef`]
44///
45/// A slice of `ColumnarValue`s logically represents a table, with each column
46/// having the same number of rows. This means that all `Array`s are the same
47/// length.
48///
49/// # Example
50///
51/// A `ColumnarValue::Array` with an array of 5 elements and a
52/// `ColumnarValue::Scalar` with the value 100
53///
54/// ```text
55/// ┌──────────────┐
56/// │ ┌──────────┐ │
57/// │ │   "A"    │ │
58/// │ ├──────────┤ │
59/// │ │   "B"    │ │
60/// │ ├──────────┤ │
61/// │ │   "C"    │ │
62/// │ ├──────────┤ │
63/// │ │   "D"    │ │        ┌──────────────┐
64/// │ ├──────────┤ │        │ ┌──────────┐ │
65/// │ │   "E"    │ │        │ │   100    │ │
66/// │ └──────────┘ │        │ └──────────┘ │
67/// └──────────────┘        └──────────────┘
68///
69///  ColumnarValue::        ColumnarValue::
70///       Array                 Scalar
71/// ```
72///
73/// Logically represents the following table:
74///
75/// | Column 1| Column 2 |
76/// | ------- | -------- |
77/// | A | 100 |
78/// | B | 100 |
79/// | C | 100 |
80/// | D | 100 |
81/// | E | 100 |
82///
83/// # Performance Notes
84///
85/// When implementing functions or operators, it is important to consider the
86/// performance implications of handling scalar values.
87///
88/// Because all functions must handle [`ArrayRef`], it is
89/// convenient to convert [`ColumnarValue::Scalar`]s using
90/// [`Self::into_array`]. For example,  [`ColumnarValue::values_to_arrays`]
91/// converts multiple columnar values into arrays of the same length.
92///
93/// However, it is often much more performant to provide a different,
94/// implementation that handles scalar values differently
95#[derive(Clone, Debug)]
96pub enum ColumnarValue {
97    /// Array of values
98    Array(ArrayRef),
99    /// A single value
100    Scalar(ScalarValue),
101}
102
103impl From<ArrayRef> for ColumnarValue {
104    fn from(value: ArrayRef) -> Self {
105        ColumnarValue::Array(value)
106    }
107}
108
109impl From<ScalarValue> for ColumnarValue {
110    fn from(value: ScalarValue) -> Self {
111        ColumnarValue::Scalar(value)
112    }
113}
114
115impl ColumnarValue {
116    pub fn data_type(&self) -> DataType {
117        match self {
118            ColumnarValue::Array(array_value) => array_value.data_type().clone(),
119            ColumnarValue::Scalar(scalar_value) => scalar_value.data_type(),
120        }
121    }
122
123    /// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
124    /// number of rows  by repeating the same scalar multiple times,
125    /// which is not as efficient as handling the scalar directly.
126    /// [`Self::Array`] will just be returned as is.
127    ///
128    /// See [`Self::into_array_of_size`] if you need to validate the length of the output array.
129    ///
130    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
131    /// arrays of the same length.
132    ///
133    /// # Errors
134    ///
135    /// Errors if `self` is a Scalar that fails to be converted into an array of size
136    pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
137        Ok(match self {
138            ColumnarValue::Array(array) => array,
139            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
140        })
141    }
142
143    /// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
144    /// number of rows. [`Self::Scalar`] is converted by repeating the same
145    /// scalar multiple times which is not as efficient as handling the scalar
146    /// directly.
147    /// This validates that if this is [`Self::Array`], it has the expected length.
148    ///
149    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
150    /// arrays of the same length.
151    ///
152    /// # Errors
153    ///
154    /// Errors if `self` is a Scalar that fails to be converted into an array of size or
155    /// if the array length does not match the expected length
156    pub fn into_array_of_size(self, num_rows: usize) -> Result<ArrayRef> {
157        match self {
158            ColumnarValue::Array(array) => {
159                if array.len() == num_rows {
160                    Ok(array)
161                } else {
162                    internal_err!(
163                        "Array length {} does not match expected length {}",
164                        array.len(),
165                        num_rows
166                    )
167                }
168            }
169            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
170        }
171    }
172
173    /// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
174    /// number of rows  by repeating the same scalar multiple times,
175    /// which is not as efficient as handling the scalar directly.
176    /// [`Self::Array`] will just be returned as is.
177    ///
178    /// See [`Self::to_array_of_size`] if you need to validate the length of the output array.
179    ///
180    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
181    /// arrays of the same length.
182    ///
183    /// # Errors
184    ///
185    /// Errors if `self` is a Scalar that fails to be converted into an array of size
186    pub fn to_array(&self, num_rows: usize) -> Result<ArrayRef> {
187        Ok(match self {
188            ColumnarValue::Array(array) => Arc::clone(array),
189            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
190        })
191    }
192
193    /// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
194    /// number of rows. [`Self::Scalar`] is converted by repeating the same
195    /// scalar multiple times which is not as efficient as handling the scalar
196    /// directly.
197    /// This validates that if this is [`Self::Array`], it has the expected length.
198    ///
199    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
200    /// arrays of the same length.
201    ///
202    /// # Errors
203    ///
204    /// Errors if `self` is a Scalar that fails to be converted into an array of size or
205    /// if the array length does not match the expected length
206    pub fn to_array_of_size(&self, num_rows: usize) -> Result<ArrayRef> {
207        match self {
208            ColumnarValue::Array(array) => {
209                if array.len() == num_rows {
210                    Ok(Arc::clone(array))
211                } else {
212                    internal_err!(
213                        "Array length {} does not match expected length {}",
214                        array.len(),
215                        num_rows
216                    )
217                }
218            }
219            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
220        }
221    }
222
223    /// Null columnar values are implemented as a null array in order to pass batch
224    /// num_rows
225    pub fn create_null_array(num_rows: usize) -> Self {
226        ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
227    }
228
229    /// Converts  [`ColumnarValue`]s to [`ArrayRef`]s with the same length.
230    ///
231    /// # Performance Note
232    ///
233    /// This function expands any [`ScalarValue`] to an array. This expansion
234    /// permits using a single function in terms of arrays, but it can be
235    /// inefficient compared to handling the scalar value directly.
236    ///
237    /// Thus, it is recommended to provide specialized implementations for
238    /// scalar values if performance is a concern.
239    ///
240    /// # Errors
241    ///
242    /// If there are multiple array arguments that have different lengths
243    pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
244        if args.is_empty() {
245            return Ok(vec![]);
246        }
247
248        let mut array_len = None;
249        for arg in args {
250            array_len = match (arg, array_len) {
251                (ColumnarValue::Array(a), None) => Some(a.len()),
252                (ColumnarValue::Array(a), Some(array_len)) => {
253                    if array_len == a.len() {
254                        Some(array_len)
255                    } else {
256                        return internal_err!(
257                            "Arguments has mixed length. Expected length: {array_len}, found length: {}",
258                            a.len()
259                        );
260                    }
261                }
262                (ColumnarValue::Scalar(_), array_len) => array_len,
263            }
264        }
265
266        // If array_len is none, it means there are only scalars, so make a 1 element array
267        let inferred_length = array_len.unwrap_or(1);
268
269        let args = args
270            .iter()
271            .map(|arg| arg.to_array(inferred_length))
272            .collect::<Result<Vec<_>>>()?;
273
274        Ok(args)
275    }
276
277    /// Cast this [ColumnarValue] to the specified `DataType`
278    ///
279    /// # Struct Casting Behavior
280    ///
281    /// When casting struct types, fields are matched **by name** rather than position:
282    /// - Source fields are matched to target fields using case-sensitive name comparison
283    /// - Fields are reordered to match the target schema
284    /// - Missing target fields are filled with null arrays
285    /// - Extra source fields are ignored
286    ///
287    /// For non-struct types, uses Arrow's standard positional casting.
288    pub fn cast_to(
289        &self,
290        cast_type: &DataType,
291        cast_options: Option<&CastOptions<'static>>,
292    ) -> Result<ColumnarValue> {
293        let cast_options = cast_options.cloned().unwrap_or(DEFAULT_CAST_OPTIONS);
294        match self {
295            ColumnarValue::Array(array) => {
296                let casted = cast_array_by_name(array, cast_type, &cast_options)?;
297                Ok(ColumnarValue::Array(casted))
298            }
299            ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
300                scalar.cast_to_with_options(cast_type, &cast_options)?,
301            )),
302        }
303    }
304}
305
306fn cast_array_by_name(
307    array: &ArrayRef,
308    cast_type: &DataType,
309    cast_options: &CastOptions<'static>,
310) -> Result<ArrayRef> {
311    // If types are already equal, no cast needed
312    if array.data_type() == cast_type {
313        return Ok(Arc::clone(array));
314    }
315
316    match cast_type {
317        DataType::Struct(_) => {
318            // Field name is unused; only the struct's inner field names matter
319            let target_field = Field::new("_", cast_type.clone(), true);
320            datafusion_common::nested_struct::cast_column(
321                array,
322                &target_field,
323                cast_options,
324            )
325        }
326        _ => {
327            ensure_date_array_timestamp_bounds(array, cast_type)?;
328            Ok(kernels::cast::cast_with_options(
329                array,
330                cast_type,
331                cast_options,
332            )?)
333        }
334    }
335}
336
337fn ensure_date_array_timestamp_bounds(
338    array: &ArrayRef,
339    cast_type: &DataType,
340) -> Result<()> {
341    let source_type = array.data_type().clone();
342    let Some(multiplier) = date_to_timestamp_multiplier(&source_type, cast_type) else {
343        return Ok(());
344    };
345
346    if multiplier <= 1 {
347        return Ok(());
348    }
349
350    // Use compute kernels to find min/max instead of iterating all elements
351    let (min_val, max_val): (Option<i64>, Option<i64>) = match &source_type {
352        DataType::Date32 => {
353            let arr = array
354                .as_any()
355                .downcast_ref::<Date32Array>()
356                .ok_or_else(|| {
357                    internal_datafusion_err!(
358                        "Expected Date32Array but found {}",
359                        array.data_type()
360                    )
361                })?;
362            (min(arr).map(|v| v as i64), max(arr).map(|v| v as i64))
363        }
364        DataType::Date64 => {
365            let arr = array
366                .as_any()
367                .downcast_ref::<Date64Array>()
368                .ok_or_else(|| {
369                    internal_datafusion_err!(
370                        "Expected Date64Array but found {}",
371                        array.data_type()
372                    )
373                })?;
374            (min(arr), max(arr))
375        }
376        _ => return Ok(()), // Not a date type, nothing to do
377    };
378
379    // Only validate the min and max values instead of all elements
380    if let Some(min) = min_val {
381        ensure_timestamp_in_bounds(min, multiplier, &source_type, cast_type)?;
382    }
383    if let Some(max) = max_val {
384        ensure_timestamp_in_bounds(max, multiplier, &source_type, cast_type)?;
385    }
386
387    Ok(())
388}
389
390// Implement Display trait for ColumnarValue
391impl fmt::Display for ColumnarValue {
392    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
393        let formatted = match self {
394            ColumnarValue::Array(array) => {
395                pretty_format_columns("ColumnarValue(ArrayRef)", &[Arc::clone(array)])
396            }
397            ColumnarValue::Scalar(_) => {
398                if let Ok(array) = self.to_array(1) {
399                    pretty_format_columns("ColumnarValue(ScalarValue)", &[array])
400                } else {
401                    return write!(f, "Error formatting columnar value");
402                }
403            }
404        };
405
406        if let Ok(formatted) = formatted {
407            write!(f, "{formatted}")
408        } else {
409            write!(f, "Error formatting columnar value")
410        }
411    }
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417    use arrow::{
418        array::{Date64Array, Int32Array, StructArray},
419        datatypes::{Field, Fields, TimeUnit},
420    };
421
422    #[test]
423    fn into_array_of_size() {
424        // Array case
425        let arr = make_array(1, 3);
426        let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
427        assert_eq!(&arr_columnar_value.into_array_of_size(3).unwrap(), &arr);
428
429        // Scalar case
430        let scalar_columnar_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(42)));
431        let expected_array = make_array(42, 100);
432        assert_eq!(
433            &scalar_columnar_value.into_array_of_size(100).unwrap(),
434            &expected_array
435        );
436
437        // Array case with wrong size
438        let arr = make_array(1, 3);
439        let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
440        let result = arr_columnar_value.into_array_of_size(5);
441        let err = result.unwrap_err();
442        assert!(
443            err.to_string().starts_with(
444                "Internal error: Array length 3 does not match expected length 5"
445            ),
446            "Found: {err}"
447        );
448    }
449
450    #[test]
451    fn values_to_arrays() {
452        // (input, expected)
453        let cases = vec![
454            // empty
455            TestCase {
456                input: vec![],
457                expected: vec![],
458            },
459            // one array of length 3
460            TestCase {
461                input: vec![ColumnarValue::Array(make_array(1, 3))],
462                expected: vec![make_array(1, 3)],
463            },
464            // two arrays length 3
465            TestCase {
466                input: vec![
467                    ColumnarValue::Array(make_array(1, 3)),
468                    ColumnarValue::Array(make_array(2, 3)),
469                ],
470                expected: vec![make_array(1, 3), make_array(2, 3)],
471            },
472            // array and scalar
473            TestCase {
474                input: vec![
475                    ColumnarValue::Array(make_array(1, 3)),
476                    ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
477                ],
478                expected: vec![
479                    make_array(1, 3),
480                    make_array(100, 3), // scalar is expanded
481                ],
482            },
483            // scalar and array
484            TestCase {
485                input: vec![
486                    ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
487                    ColumnarValue::Array(make_array(1, 3)),
488                ],
489                expected: vec![
490                    make_array(100, 3), // scalar is expanded
491                    make_array(1, 3),
492                ],
493            },
494            // multiple scalars and array
495            TestCase {
496                input: vec![
497                    ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
498                    ColumnarValue::Array(make_array(1, 3)),
499                    ColumnarValue::Scalar(ScalarValue::Int32(Some(200))),
500                ],
501                expected: vec![
502                    make_array(100, 3), // scalar is expanded
503                    make_array(1, 3),
504                    make_array(200, 3), // scalar is expanded
505                ],
506            },
507        ];
508        for case in cases {
509            case.run();
510        }
511    }
512
513    #[test]
514    #[should_panic(
515        expected = "Arguments has mixed length. Expected length: 3, found length: 4"
516    )]
517    fn values_to_arrays_mixed_length() {
518        ColumnarValue::values_to_arrays(&[
519            ColumnarValue::Array(make_array(1, 3)),
520            ColumnarValue::Array(make_array(2, 4)),
521        ])
522        .unwrap();
523    }
524
525    #[test]
526    #[should_panic(
527        expected = "Arguments has mixed length. Expected length: 3, found length: 7"
528    )]
529    fn values_to_arrays_mixed_length_and_scalar() {
530        ColumnarValue::values_to_arrays(&[
531            ColumnarValue::Array(make_array(1, 3)),
532            ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
533            ColumnarValue::Array(make_array(2, 7)),
534        ])
535        .unwrap();
536    }
537
538    struct TestCase {
539        input: Vec<ColumnarValue>,
540        expected: Vec<ArrayRef>,
541    }
542
543    impl TestCase {
544        fn run(self) {
545            let Self { input, expected } = self;
546
547            assert_eq!(
548                ColumnarValue::values_to_arrays(&input).unwrap(),
549                expected,
550                "\ninput: {input:?}\nexpected: {expected:?}"
551            );
552        }
553    }
554
555    /// Makes an array of length `len` with all elements set to `val`
556    fn make_array(val: i32, len: usize) -> ArrayRef {
557        Arc::new(Int32Array::from(vec![val; len]))
558    }
559
560    #[test]
561    fn test_display_scalar() {
562        let column = ColumnarValue::from(ScalarValue::from("foo"));
563        assert_eq!(
564            column.to_string(),
565            concat!(
566                "+----------------------------+\n",
567                "| ColumnarValue(ScalarValue) |\n",
568                "+----------------------------+\n",
569                "| foo                        |\n",
570                "+----------------------------+"
571            )
572        );
573    }
574
575    #[test]
576    fn test_display_array() {
577        let array: ArrayRef = Arc::new(Int32Array::from_iter_values(vec![1, 2, 3]));
578        let column = ColumnarValue::from(array);
579        assert_eq!(
580            column.to_string(),
581            concat!(
582                "+-------------------------+\n",
583                "| ColumnarValue(ArrayRef) |\n",
584                "+-------------------------+\n",
585                "| 1                       |\n",
586                "| 2                       |\n",
587                "| 3                       |\n",
588                "+-------------------------+"
589            )
590        );
591    }
592
593    #[test]
594    fn cast_struct_by_field_name() {
595        let source_fields = Fields::from(vec![
596            Field::new("b", DataType::Int32, true),
597            Field::new("a", DataType::Int32, true),
598        ]);
599
600        let target_fields = Fields::from(vec![
601            Field::new("a", DataType::Int32, true),
602            Field::new("b", DataType::Int32, true),
603        ]);
604
605        let struct_array = StructArray::new(
606            source_fields,
607            vec![
608                Arc::new(Int32Array::from(vec![Some(3)])),
609                Arc::new(Int32Array::from(vec![Some(4)])),
610            ],
611            None,
612        );
613
614        let value = ColumnarValue::Array(Arc::new(struct_array));
615        let casted = value
616            .cast_to(&DataType::Struct(target_fields.clone()), None)
617            .expect("struct cast should succeed");
618
619        let ColumnarValue::Array(arr) = casted else {
620            panic!("expected array after cast");
621        };
622
623        let struct_array = arr
624            .as_any()
625            .downcast_ref::<StructArray>()
626            .expect("expected StructArray");
627
628        let field_a = struct_array
629            .column_by_name("a")
630            .expect("expected field a in cast result");
631        let field_b = struct_array
632            .column_by_name("b")
633            .expect("expected field b in cast result");
634
635        assert_eq!(
636            field_a
637                .as_any()
638                .downcast_ref::<Int32Array>()
639                .expect("expected Int32 array")
640                .value(0),
641            4
642        );
643        assert_eq!(
644            field_b
645                .as_any()
646                .downcast_ref::<Int32Array>()
647                .expect("expected Int32 array")
648                .value(0),
649            3
650        );
651    }
652
653    #[test]
654    fn cast_struct_missing_field_inserts_nulls() {
655        let source_fields = Fields::from(vec![Field::new("a", DataType::Int32, true)]);
656
657        let target_fields = Fields::from(vec![
658            Field::new("a", DataType::Int32, true),
659            Field::new("b", DataType::Int32, true),
660        ]);
661
662        let struct_array = StructArray::new(
663            source_fields,
664            vec![Arc::new(Int32Array::from(vec![Some(5)]))],
665            None,
666        );
667
668        let value = ColumnarValue::Array(Arc::new(struct_array));
669        let casted = value
670            .cast_to(&DataType::Struct(target_fields.clone()), None)
671            .expect("struct cast should succeed");
672
673        let ColumnarValue::Array(arr) = casted else {
674            panic!("expected array after cast");
675        };
676
677        let struct_array = arr
678            .as_any()
679            .downcast_ref::<StructArray>()
680            .expect("expected StructArray");
681
682        let field_b = struct_array
683            .column_by_name("b")
684            .expect("expected missing field to be added");
685
686        assert!(field_b.is_null(0));
687    }
688
689    #[test]
690    fn cast_date64_array_to_timestamp_overflow() {
691        let overflow_value = i64::MAX / 1_000_000 + 1;
692        let array: ArrayRef = Arc::new(Date64Array::from(vec![Some(overflow_value)]));
693        let value = ColumnarValue::Array(array);
694        let result =
695            value.cast_to(&DataType::Timestamp(TimeUnit::Nanosecond, None), None);
696        let err = result.expect_err("expected overflow to be detected");
697        assert!(
698            err.to_string()
699                .contains("converted value exceeds the representable i64 range"),
700            "unexpected error: {err}"
701        );
702    }
703}