Skip to main content

datafusion_expr_common/
columnar_value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ColumnarValue`] represents the result of evaluating an expression.
19
20use arrow::{
21    array::{Array, ArrayRef, Date32Array, Date64Array, NullArray},
22    compute::{CastOptions, kernels, max, min},
23    datatypes::DataType,
24    util::pretty::pretty_format_columns,
25};
26use datafusion_common::internal_datafusion_err;
27use datafusion_common::{
28    Result, ScalarValue,
29    format::DEFAULT_CAST_OPTIONS,
30    internal_err,
31    scalar::{date_to_timestamp_multiplier, ensure_timestamp_in_bounds},
32};
33use std::fmt;
34use std::sync::Arc;
35
36/// The result of evaluating an expression.
37///
38/// [`ColumnarValue::Scalar`] represents a single value repeated any number of
39/// times. This is an important performance optimization for handling values
40/// that do not change across rows.
41///
42/// [`ColumnarValue::Array`] represents a column of data, stored as an  Arrow
43/// [`ArrayRef`]
44///
45/// A slice of `ColumnarValue`s logically represents a table, with each column
46/// having the same number of rows. This means that all `Array`s are the same
47/// length.
48///
49/// # Example
50///
51/// A `ColumnarValue::Array` with an array of 5 elements and a
52/// `ColumnarValue::Scalar` with the value 100
53///
54/// ```text
55/// ┌──────────────┐
56/// │ ┌──────────┐ │
57/// │ │   "A"    │ │
58/// │ ├──────────┤ │
59/// │ │   "B"    │ │
60/// │ ├──────────┤ │
61/// │ │   "C"    │ │
62/// │ ├──────────┤ │
63/// │ │   "D"    │ │        ┌──────────────┐
64/// │ ├──────────┤ │        │ ┌──────────┐ │
65/// │ │   "E"    │ │        │ │   100    │ │
66/// │ └──────────┘ │        │ └──────────┘ │
67/// └──────────────┘        └──────────────┘
68///
69///  ColumnarValue::        ColumnarValue::
70///       Array                 Scalar
71/// ```
72///
73/// Logically represents the following table:
74///
75/// | Column 1| Column 2 |
76/// | ------- | -------- |
77/// | A | 100 |
78/// | B | 100 |
79/// | C | 100 |
80/// | D | 100 |
81/// | E | 100 |
82///
83/// # Performance Notes
84///
85/// When implementing functions or operators, it is important to consider the
86/// performance implications of handling scalar values.
87///
88/// Because all functions must handle [`ArrayRef`], it is
89/// convenient to convert [`ColumnarValue::Scalar`]s using
90/// [`Self::into_array`]. For example,  [`ColumnarValue::values_to_arrays`]
91/// converts multiple columnar values into arrays of the same length.
92///
93/// However, it is often much more performant to provide a different,
94/// implementation that handles scalar values differently
95#[derive(Clone, Debug)]
96pub enum ColumnarValue {
97    /// Array of values
98    Array(ArrayRef),
99    /// A single value
100    Scalar(ScalarValue),
101}
102
103impl From<ArrayRef> for ColumnarValue {
104    fn from(value: ArrayRef) -> Self {
105        ColumnarValue::Array(value)
106    }
107}
108
109impl From<ScalarValue> for ColumnarValue {
110    fn from(value: ScalarValue) -> Self {
111        ColumnarValue::Scalar(value)
112    }
113}
114
115impl ColumnarValue {
116    pub fn data_type(&self) -> DataType {
117        match self {
118            ColumnarValue::Array(array_value) => array_value.data_type().clone(),
119            ColumnarValue::Scalar(scalar_value) => scalar_value.data_type(),
120        }
121    }
122
123    /// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
124    /// number of rows  by repeating the same scalar multiple times,
125    /// which is not as efficient as handling the scalar directly.
126    /// [`Self::Array`] will just be returned as is.
127    ///
128    /// See [`Self::into_array_of_size`] if you need to validate the length of the output array.
129    ///
130    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
131    /// arrays of the same length.
132    ///
133    /// # Errors
134    ///
135    /// Errors if `self` is a Scalar that fails to be converted into an array of size
136    pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
137        Ok(match self {
138            ColumnarValue::Array(array) => array,
139            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
140        })
141    }
142
143    /// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
144    /// number of rows. [`Self::Scalar`] is converted by repeating the same
145    /// scalar multiple times which is not as efficient as handling the scalar
146    /// directly.
147    /// This validates that if this is [`Self::Array`], it has the expected length.
148    ///
149    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
150    /// arrays of the same length.
151    ///
152    /// # Errors
153    ///
154    /// Errors if `self` is a Scalar that fails to be converted into an array of size or
155    /// if the array length does not match the expected length
156    pub fn into_array_of_size(self, num_rows: usize) -> Result<ArrayRef> {
157        match self {
158            ColumnarValue::Array(array) => {
159                if array.len() == num_rows {
160                    Ok(array)
161                } else {
162                    internal_err!(
163                        "Array length {} does not match expected length {}",
164                        array.len(),
165                        num_rows
166                    )
167                }
168            }
169            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
170        }
171    }
172
173    /// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
174    /// number of rows  by repeating the same scalar multiple times,
175    /// which is not as efficient as handling the scalar directly.
176    /// [`Self::Array`] will just be returned as is.
177    ///
178    /// See [`Self::to_array_of_size`] if you need to validate the length of the output array.
179    ///
180    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
181    /// arrays of the same length.
182    ///
183    /// # Errors
184    ///
185    /// Errors if `self` is a Scalar that fails to be converted into an array of size
186    pub fn to_array(&self, num_rows: usize) -> Result<ArrayRef> {
187        Ok(match self {
188            ColumnarValue::Array(array) => Arc::clone(array),
189            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
190        })
191    }
192
193    /// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
194    /// number of rows. [`Self::Scalar`] is converted by repeating the same
195    /// scalar multiple times which is not as efficient as handling the scalar
196    /// directly.
197    /// This validates that if this is [`Self::Array`], it has the expected length.
198    ///
199    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
200    /// arrays of the same length.
201    ///
202    /// # Errors
203    ///
204    /// Errors if `self` is a Scalar that fails to be converted into an array of size or
205    /// if the array length does not match the expected length
206    pub fn to_array_of_size(&self, num_rows: usize) -> Result<ArrayRef> {
207        match self {
208            ColumnarValue::Array(array) => {
209                if array.len() == num_rows {
210                    Ok(Arc::clone(array))
211                } else {
212                    internal_err!(
213                        "Array length {} does not match expected length {}",
214                        array.len(),
215                        num_rows
216                    )
217                }
218            }
219            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
220        }
221    }
222
223    /// Null columnar values are implemented as a null array in order to pass batch
224    /// num_rows
225    pub fn create_null_array(num_rows: usize) -> Self {
226        ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
227    }
228
229    /// Converts  [`ColumnarValue`]s to [`ArrayRef`]s with the same length.
230    ///
231    /// # Performance Note
232    ///
233    /// This function expands any [`ScalarValue`] to an array. This expansion
234    /// permits using a single function in terms of arrays, but it can be
235    /// inefficient compared to handling the scalar value directly.
236    ///
237    /// Thus, it is recommended to provide specialized implementations for
238    /// scalar values if performance is a concern.
239    ///
240    /// # Errors
241    ///
242    /// If there are multiple array arguments that have different lengths
243    pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
244        if args.is_empty() {
245            return Ok(vec![]);
246        }
247
248        let mut array_len = None;
249        for arg in args {
250            array_len = match (arg, array_len) {
251                (ColumnarValue::Array(a), None) => Some(a.len()),
252                (ColumnarValue::Array(a), Some(array_len)) => {
253                    if array_len == a.len() {
254                        Some(array_len)
255                    } else {
256                        return internal_err!(
257                            "Arguments has mixed length. Expected length: {array_len}, found length: {}",
258                            a.len()
259                        );
260                    }
261                }
262                (ColumnarValue::Scalar(_), array_len) => array_len,
263            }
264        }
265
266        // If array_len is none, it means there are only scalars, so make a 1 element array
267        let inferred_length = array_len.unwrap_or(1);
268
269        let args = args
270            .iter()
271            .map(|arg| arg.to_array(inferred_length))
272            .collect::<Result<Vec<_>>>()?;
273
274        Ok(args)
275    }
276
277    /// Cast this [ColumnarValue] to the specified `DataType`
278    ///
279    /// # Struct Casting Behavior
280    ///
281    /// When casting struct types, fields are matched **by name** rather than position:
282    /// - Source fields are matched to target fields using case-sensitive name comparison
283    /// - Fields are reordered to match the target schema
284    /// - Missing target fields are filled with null arrays
285    /// - Extra source fields are ignored
286    ///
287    /// For non-struct types, uses Arrow's standard positional casting.
288    pub fn cast_to(
289        &self,
290        cast_type: &DataType,
291        cast_options: Option<&CastOptions<'static>>,
292    ) -> Result<ColumnarValue> {
293        let cast_options = cast_options.cloned().unwrap_or(DEFAULT_CAST_OPTIONS);
294        match self {
295            ColumnarValue::Array(array) => {
296                let casted = cast_array_by_name(array, cast_type, &cast_options)?;
297                Ok(ColumnarValue::Array(casted))
298            }
299            ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
300                scalar.cast_to_with_options(cast_type, &cast_options)?,
301            )),
302        }
303    }
304}
305
306fn cast_array_by_name(
307    array: &ArrayRef,
308    cast_type: &DataType,
309    cast_options: &CastOptions<'static>,
310) -> Result<ArrayRef> {
311    // If types are already equal, no cast needed
312    if array.data_type() == cast_type {
313        return Ok(Arc::clone(array));
314    }
315
316    if datafusion_common::nested_struct::requires_nested_struct_cast(
317        array.data_type(),
318        cast_type,
319    ) {
320        datafusion_common::nested_struct::cast_column(array, cast_type, cast_options)
321    } else {
322        ensure_date_array_timestamp_bounds(array, cast_type)?;
323        Ok(kernels::cast::cast_with_options(
324            array,
325            cast_type,
326            cast_options,
327        )?)
328    }
329}
330
331fn ensure_date_array_timestamp_bounds(
332    array: &ArrayRef,
333    cast_type: &DataType,
334) -> Result<()> {
335    let source_type = array.data_type().clone();
336    let Some(multiplier) = date_to_timestamp_multiplier(&source_type, cast_type) else {
337        return Ok(());
338    };
339
340    if multiplier <= 1 {
341        return Ok(());
342    }
343
344    // Use compute kernels to find min/max instead of iterating all elements
345    let (min_val, max_val): (Option<i64>, Option<i64>) = match &source_type {
346        DataType::Date32 => {
347            let arr = array
348                .as_any()
349                .downcast_ref::<Date32Array>()
350                .ok_or_else(|| {
351                    internal_datafusion_err!(
352                        "Expected Date32Array but found {}",
353                        array.data_type()
354                    )
355                })?;
356            (min(arr).map(|v| v as i64), max(arr).map(|v| v as i64))
357        }
358        DataType::Date64 => {
359            let arr = array
360                .as_any()
361                .downcast_ref::<Date64Array>()
362                .ok_or_else(|| {
363                    internal_datafusion_err!(
364                        "Expected Date64Array but found {}",
365                        array.data_type()
366                    )
367                })?;
368            (min(arr), max(arr))
369        }
370        _ => return Ok(()), // Not a date type, nothing to do
371    };
372
373    // Only validate the min and max values instead of all elements
374    if let Some(min) = min_val {
375        ensure_timestamp_in_bounds(min, multiplier, &source_type, cast_type)?;
376    }
377    if let Some(max) = max_val {
378        ensure_timestamp_in_bounds(max, multiplier, &source_type, cast_type)?;
379    }
380
381    Ok(())
382}
383
384// Implement Display trait for ColumnarValue
385impl fmt::Display for ColumnarValue {
386    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
387        let formatted = match self {
388            ColumnarValue::Array(array) => {
389                pretty_format_columns("ColumnarValue(ArrayRef)", &[Arc::clone(array)])
390            }
391            ColumnarValue::Scalar(_) => {
392                if let Ok(array) = self.to_array(1) {
393                    pretty_format_columns("ColumnarValue(ScalarValue)", &[array])
394                } else {
395                    return write!(f, "Error formatting columnar value");
396                }
397            }
398        };
399
400        if let Ok(formatted) = formatted {
401            write!(f, "{formatted}")
402        } else {
403            write!(f, "Error formatting columnar value")
404        }
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use arrow::{
412        array::{Date64Array, Int32Array, StructArray},
413        datatypes::{Field, Fields, TimeUnit},
414    };
415
416    #[test]
417    fn into_array_of_size() {
418        // Array case
419        let arr = make_array(1, 3);
420        let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
421        assert_eq!(&arr_columnar_value.into_array_of_size(3).unwrap(), &arr);
422
423        // Scalar case
424        let scalar_columnar_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(42)));
425        let expected_array = make_array(42, 100);
426        assert_eq!(
427            &scalar_columnar_value.into_array_of_size(100).unwrap(),
428            &expected_array
429        );
430
431        // Array case with wrong size
432        let arr = make_array(1, 3);
433        let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
434        let result = arr_columnar_value.into_array_of_size(5);
435        let err = result.unwrap_err();
436        assert!(
437            err.to_string().starts_with(
438                "Internal error: Array length 3 does not match expected length 5"
439            ),
440            "Found: {err}"
441        );
442    }
443
444    #[test]
445    fn values_to_arrays() {
446        // (input, expected)
447        let cases = vec![
448            // empty
449            TestCase {
450                input: vec![],
451                expected: vec![],
452            },
453            // one array of length 3
454            TestCase {
455                input: vec![ColumnarValue::Array(make_array(1, 3))],
456                expected: vec![make_array(1, 3)],
457            },
458            // two arrays length 3
459            TestCase {
460                input: vec![
461                    ColumnarValue::Array(make_array(1, 3)),
462                    ColumnarValue::Array(make_array(2, 3)),
463                ],
464                expected: vec![make_array(1, 3), make_array(2, 3)],
465            },
466            // array and scalar
467            TestCase {
468                input: vec![
469                    ColumnarValue::Array(make_array(1, 3)),
470                    ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
471                ],
472                expected: vec![
473                    make_array(1, 3),
474                    make_array(100, 3), // scalar is expanded
475                ],
476            },
477            // scalar and array
478            TestCase {
479                input: vec![
480                    ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
481                    ColumnarValue::Array(make_array(1, 3)),
482                ],
483                expected: vec![
484                    make_array(100, 3), // scalar is expanded
485                    make_array(1, 3),
486                ],
487            },
488            // multiple scalars and array
489            TestCase {
490                input: vec![
491                    ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
492                    ColumnarValue::Array(make_array(1, 3)),
493                    ColumnarValue::Scalar(ScalarValue::Int32(Some(200))),
494                ],
495                expected: vec![
496                    make_array(100, 3), // scalar is expanded
497                    make_array(1, 3),
498                    make_array(200, 3), // scalar is expanded
499                ],
500            },
501        ];
502        for case in cases {
503            case.run();
504        }
505    }
506
507    #[test]
508    #[should_panic(
509        expected = "Arguments has mixed length. Expected length: 3, found length: 4"
510    )]
511    fn values_to_arrays_mixed_length() {
512        ColumnarValue::values_to_arrays(&[
513            ColumnarValue::Array(make_array(1, 3)),
514            ColumnarValue::Array(make_array(2, 4)),
515        ])
516        .unwrap();
517    }
518
519    #[test]
520    #[should_panic(
521        expected = "Arguments has mixed length. Expected length: 3, found length: 7"
522    )]
523    fn values_to_arrays_mixed_length_and_scalar() {
524        ColumnarValue::values_to_arrays(&[
525            ColumnarValue::Array(make_array(1, 3)),
526            ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
527            ColumnarValue::Array(make_array(2, 7)),
528        ])
529        .unwrap();
530    }
531
532    struct TestCase {
533        input: Vec<ColumnarValue>,
534        expected: Vec<ArrayRef>,
535    }
536
537    impl TestCase {
538        fn run(self) {
539            let Self { input, expected } = self;
540
541            assert_eq!(
542                ColumnarValue::values_to_arrays(&input).unwrap(),
543                expected,
544                "\ninput: {input:?}\nexpected: {expected:?}"
545            );
546        }
547    }
548
549    /// Makes an array of length `len` with all elements set to `val`
550    fn make_array(val: i32, len: usize) -> ArrayRef {
551        Arc::new(Int32Array::from(vec![val; len]))
552    }
553
554    #[test]
555    fn test_display_scalar() {
556        let column = ColumnarValue::from(ScalarValue::from("foo"));
557        assert_eq!(
558            column.to_string(),
559            concat!(
560                "+----------------------------+\n",
561                "| ColumnarValue(ScalarValue) |\n",
562                "+----------------------------+\n",
563                "| foo                        |\n",
564                "+----------------------------+"
565            )
566        );
567    }
568
569    #[test]
570    fn test_display_array() {
571        let array: ArrayRef = Arc::new(Int32Array::from_iter_values(vec![1, 2, 3]));
572        let column = ColumnarValue::from(array);
573        assert_eq!(
574            column.to_string(),
575            concat!(
576                "+-------------------------+\n",
577                "| ColumnarValue(ArrayRef) |\n",
578                "+-------------------------+\n",
579                "| 1                       |\n",
580                "| 2                       |\n",
581                "| 3                       |\n",
582                "+-------------------------+"
583            )
584        );
585    }
586
587    #[test]
588    fn cast_struct_by_field_name() {
589        let source_fields = Fields::from(vec![
590            Field::new("b", DataType::Int32, true),
591            Field::new("a", DataType::Int32, true),
592        ]);
593
594        let target_fields = Fields::from(vec![
595            Field::new("a", DataType::Int32, true),
596            Field::new("b", DataType::Int32, true),
597        ]);
598
599        let struct_array = StructArray::new(
600            source_fields,
601            vec![
602                Arc::new(Int32Array::from(vec![Some(3)])),
603                Arc::new(Int32Array::from(vec![Some(4)])),
604            ],
605            None,
606        );
607
608        let value = ColumnarValue::Array(Arc::new(struct_array));
609        let casted = value
610            .cast_to(&DataType::Struct(target_fields.clone()), None)
611            .expect("struct cast should succeed");
612
613        let ColumnarValue::Array(arr) = casted else {
614            panic!("expected array after cast");
615        };
616
617        let struct_array = arr
618            .as_any()
619            .downcast_ref::<StructArray>()
620            .expect("expected StructArray");
621
622        let field_a = struct_array
623            .column_by_name("a")
624            .expect("expected field a in cast result");
625        let field_b = struct_array
626            .column_by_name("b")
627            .expect("expected field b in cast result");
628
629        assert_eq!(
630            field_a
631                .as_any()
632                .downcast_ref::<Int32Array>()
633                .expect("expected Int32 array")
634                .value(0),
635            4
636        );
637        assert_eq!(
638            field_b
639                .as_any()
640                .downcast_ref::<Int32Array>()
641                .expect("expected Int32 array")
642                .value(0),
643            3
644        );
645    }
646
647    #[test]
648    fn cast_struct_missing_field_inserts_nulls() {
649        let source_fields = Fields::from(vec![Field::new("a", DataType::Int32, true)]);
650
651        let target_fields = Fields::from(vec![
652            Field::new("a", DataType::Int32, true),
653            Field::new("b", DataType::Int32, true),
654        ]);
655
656        let struct_array = StructArray::new(
657            source_fields,
658            vec![Arc::new(Int32Array::from(vec![Some(5)]))],
659            None,
660        );
661
662        let value = ColumnarValue::Array(Arc::new(struct_array));
663        let casted = value
664            .cast_to(&DataType::Struct(target_fields.clone()), None)
665            .expect("struct cast should succeed");
666
667        let ColumnarValue::Array(arr) = casted else {
668            panic!("expected array after cast");
669        };
670
671        let struct_array = arr
672            .as_any()
673            .downcast_ref::<StructArray>()
674            .expect("expected StructArray");
675
676        let field_b = struct_array
677            .column_by_name("b")
678            .expect("expected missing field to be added");
679
680        assert!(field_b.is_null(0));
681    }
682
683    #[test]
684    fn cast_date64_array_to_timestamp_overflow() {
685        let overflow_value = i64::MAX / 1_000_000 + 1;
686        let array: ArrayRef = Arc::new(Date64Array::from(vec![Some(overflow_value)]));
687        let value = ColumnarValue::Array(array);
688        let result =
689            value.cast_to(&DataType::Timestamp(TimeUnit::Nanosecond, None), None);
690        let err = result.expect_err("expected overflow to be detected");
691        assert!(
692            err.to_string()
693                .contains("converted value exceeds the representable i64 range"),
694            "unexpected error: {err}"
695        );
696    }
697}