datafusion_expr_common/
columnar_value.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ColumnarValue`] represents the result of evaluating an expression.
19
20use arrow::{
21    array::{Array, ArrayRef, Date32Array, Date64Array, NullArray},
22    compute::{CastOptions, kernels, max, min},
23    datatypes::DataType,
24    util::pretty::pretty_format_columns,
25};
26use datafusion_common::internal_datafusion_err;
27use datafusion_common::{
28    Result, ScalarValue,
29    format::DEFAULT_CAST_OPTIONS,
30    internal_err,
31    scalar::{date_to_timestamp_multiplier, ensure_timestamp_in_bounds},
32};
33use std::fmt;
34use std::sync::Arc;
35
36/// The result of evaluating an expression.
37///
38/// [`ColumnarValue::Scalar`] represents a single value repeated any number of
39/// times. This is an important performance optimization for handling values
40/// that do not change across rows.
41///
42/// [`ColumnarValue::Array`] represents a column of data, stored as an  Arrow
43/// [`ArrayRef`]
44///
45/// A slice of `ColumnarValue`s logically represents a table, with each column
46/// having the same number of rows. This means that all `Array`s are the same
47/// length.
48///
49/// # Example
50///
51/// A `ColumnarValue::Array` with an array of 5 elements and a
52/// `ColumnarValue::Scalar` with the value 100
53///
54/// ```text
55/// ┌──────────────┐
56/// │ ┌──────────┐ │
57/// │ │   "A"    │ │
58/// │ ├──────────┤ │
59/// │ │   "B"    │ │
60/// │ ├──────────┤ │
61/// │ │   "C"    │ │
62/// │ ├──────────┤ │
63/// │ │   "D"    │ │        ┌──────────────┐
64/// │ ├──────────┤ │        │ ┌──────────┐ │
65/// │ │   "E"    │ │        │ │   100    │ │
66/// │ └──────────┘ │        │ └──────────┘ │
67/// └──────────────┘        └──────────────┘
68///
69///  ColumnarValue::        ColumnarValue::
70///       Array                 Scalar
71/// ```
72///
73/// Logically represents the following table:
74///
75/// | Column 1| Column 2 |
76/// | ------- | -------- |
77/// | A | 100 |
78/// | B | 100 |
79/// | C | 100 |
80/// | D | 100 |
81/// | E | 100 |
82///
83/// # Performance Notes
84///
85/// When implementing functions or operators, it is important to consider the
86/// performance implications of handling scalar values.
87///
88/// Because all functions must handle [`ArrayRef`], it is
89/// convenient to convert [`ColumnarValue::Scalar`]s using
90/// [`Self::into_array`]. For example,  [`ColumnarValue::values_to_arrays`]
91/// converts multiple columnar values into arrays of the same length.
92///
93/// However, it is often much more performant to provide a different,
94/// implementation that handles scalar values differently
95#[derive(Clone, Debug)]
96pub enum ColumnarValue {
97    /// Array of values
98    Array(ArrayRef),
99    /// A single value
100    Scalar(ScalarValue),
101}
102
103impl From<ArrayRef> for ColumnarValue {
104    fn from(value: ArrayRef) -> Self {
105        ColumnarValue::Array(value)
106    }
107}
108
109impl From<ScalarValue> for ColumnarValue {
110    fn from(value: ScalarValue) -> Self {
111        ColumnarValue::Scalar(value)
112    }
113}
114
115impl ColumnarValue {
116    pub fn data_type(&self) -> DataType {
117        match self {
118            ColumnarValue::Array(array_value) => array_value.data_type().clone(),
119            ColumnarValue::Scalar(scalar_value) => scalar_value.data_type(),
120        }
121    }
122
123    /// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
124    /// number of rows  by repeating the same scalar multiple times,
125    /// which is not as efficient as handling the scalar directly.
126    /// [`Self::Array`] will just be returned as is.
127    ///
128    /// See [`Self::into_array_of_size`] if you need to validate the length of the output array.
129    ///
130    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
131    /// arrays of the same length.
132    ///
133    /// # Errors
134    ///
135    /// Errors if `self` is a Scalar that fails to be converted into an array of size
136    pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
137        Ok(match self {
138            ColumnarValue::Array(array) => array,
139            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
140        })
141    }
142
143    /// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
144    /// number of rows. [`Self::Scalar`] is converted by repeating the same
145    /// scalar multiple times which is not as efficient as handling the scalar
146    /// directly.
147    /// This validates that if this is [`Self::Array`], it has the expected length.
148    ///
149    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
150    /// arrays of the same length.
151    ///
152    /// # Errors
153    ///
154    /// Errors if `self` is a Scalar that fails to be converted into an array of size or
155    /// if the array length does not match the expected length
156    pub fn into_array_of_size(self, num_rows: usize) -> Result<ArrayRef> {
157        match self {
158            ColumnarValue::Array(array) => {
159                if array.len() == num_rows {
160                    Ok(array)
161                } else {
162                    internal_err!(
163                        "Array length {} does not match expected length {}",
164                        array.len(),
165                        num_rows
166                    )
167                }
168            }
169            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
170        }
171    }
172
173    /// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
174    /// number of rows  by repeating the same scalar multiple times,
175    /// which is not as efficient as handling the scalar directly.
176    /// [`Self::Array`] will just be returned as is.
177    ///
178    /// See [`Self::to_array_of_size`] if you need to validate the length of the output array.
179    ///
180    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
181    /// arrays of the same length.
182    ///
183    /// # Errors
184    ///
185    /// Errors if `self` is a Scalar that fails to be converted into an array of size
186    pub fn to_array(&self, num_rows: usize) -> Result<ArrayRef> {
187        Ok(match self {
188            ColumnarValue::Array(array) => Arc::clone(array),
189            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
190        })
191    }
192
193    /// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
194    /// number of rows. [`Self::Scalar`] is converted by repeating the same
195    /// scalar multiple times which is not as efficient as handling the scalar
196    /// directly.
197    /// This validates that if this is [`Self::Array`], it has the expected length.
198    ///
199    /// See [`Self::values_to_arrays`] to convert multiple columnar values into
200    /// arrays of the same length.
201    ///
202    /// # Errors
203    ///
204    /// Errors if `self` is a Scalar that fails to be converted into an array of size or
205    /// if the array length does not match the expected length
206    pub fn to_array_of_size(&self, num_rows: usize) -> Result<ArrayRef> {
207        match self {
208            ColumnarValue::Array(array) => {
209                if array.len() == num_rows {
210                    Ok(Arc::clone(array))
211                } else {
212                    internal_err!(
213                        "Array length {} does not match expected length {}",
214                        array.len(),
215                        num_rows
216                    )
217                }
218            }
219            ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
220        }
221    }
222
223    /// Null columnar values are implemented as a null array in order to pass batch
224    /// num_rows
225    pub fn create_null_array(num_rows: usize) -> Self {
226        ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
227    }
228
229    /// Converts  [`ColumnarValue`]s to [`ArrayRef`]s with the same length.
230    ///
231    /// # Performance Note
232    ///
233    /// This function expands any [`ScalarValue`] to an array. This expansion
234    /// permits using a single function in terms of arrays, but it can be
235    /// inefficient compared to handling the scalar value directly.
236    ///
237    /// Thus, it is recommended to provide specialized implementations for
238    /// scalar values if performance is a concern.
239    ///
240    /// # Errors
241    ///
242    /// If there are multiple array arguments that have different lengths
243    pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
244        if args.is_empty() {
245            return Ok(vec![]);
246        }
247
248        let mut array_len = None;
249        for arg in args {
250            array_len = match (arg, array_len) {
251                (ColumnarValue::Array(a), None) => Some(a.len()),
252                (ColumnarValue::Array(a), Some(array_len)) => {
253                    if array_len == a.len() {
254                        Some(array_len)
255                    } else {
256                        return internal_err!(
257                            "Arguments has mixed length. Expected length: {array_len}, found length: {}",
258                            a.len()
259                        );
260                    }
261                }
262                (ColumnarValue::Scalar(_), array_len) => array_len,
263            }
264        }
265
266        // If array_len is none, it means there are only scalars, so make a 1 element array
267        let inferred_length = array_len.unwrap_or(1);
268
269        let args = args
270            .iter()
271            .map(|arg| arg.to_array(inferred_length))
272            .collect::<Result<Vec<_>>>()?;
273
274        Ok(args)
275    }
276
277    /// Cast's this [ColumnarValue] to the specified `DataType`
278    pub fn cast_to(
279        &self,
280        cast_type: &DataType,
281        cast_options: Option<&CastOptions<'static>>,
282    ) -> Result<ColumnarValue> {
283        let cast_options = cast_options.cloned().unwrap_or(DEFAULT_CAST_OPTIONS);
284        match self {
285            ColumnarValue::Array(array) => {
286                ensure_date_array_timestamp_bounds(array, cast_type)?;
287                Ok(ColumnarValue::Array(kernels::cast::cast_with_options(
288                    array,
289                    cast_type,
290                    &cast_options,
291                )?))
292            }
293            ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
294                scalar.cast_to_with_options(cast_type, &cast_options)?,
295            )),
296        }
297    }
298}
299
300fn ensure_date_array_timestamp_bounds(
301    array: &ArrayRef,
302    cast_type: &DataType,
303) -> Result<()> {
304    let source_type = array.data_type().clone();
305    let Some(multiplier) = date_to_timestamp_multiplier(&source_type, cast_type) else {
306        return Ok(());
307    };
308
309    if multiplier <= 1 {
310        return Ok(());
311    }
312
313    // Use compute kernels to find min/max instead of iterating all elements
314    let (min_val, max_val): (Option<i64>, Option<i64>) = match &source_type {
315        DataType::Date32 => {
316            let arr = array
317                .as_any()
318                .downcast_ref::<Date32Array>()
319                .ok_or_else(|| {
320                    internal_datafusion_err!(
321                        "Expected Date32Array but found {}",
322                        array.data_type()
323                    )
324                })?;
325            (min(arr).map(|v| v as i64), max(arr).map(|v| v as i64))
326        }
327        DataType::Date64 => {
328            let arr = array
329                .as_any()
330                .downcast_ref::<Date64Array>()
331                .ok_or_else(|| {
332                    internal_datafusion_err!(
333                        "Expected Date64Array but found {}",
334                        array.data_type()
335                    )
336                })?;
337            (min(arr), max(arr))
338        }
339        _ => return Ok(()), // Not a date type, nothing to do
340    };
341
342    // Only validate the min and max values instead of all elements
343    if let Some(min) = min_val {
344        ensure_timestamp_in_bounds(min, multiplier, &source_type, cast_type)?;
345    }
346    if let Some(max) = max_val {
347        ensure_timestamp_in_bounds(max, multiplier, &source_type, cast_type)?;
348    }
349
350    Ok(())
351}
352
353// Implement Display trait for ColumnarValue
354impl fmt::Display for ColumnarValue {
355    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
356        let formatted = match self {
357            ColumnarValue::Array(array) => {
358                pretty_format_columns("ColumnarValue(ArrayRef)", &[Arc::clone(array)])
359            }
360            ColumnarValue::Scalar(_) => {
361                if let Ok(array) = self.to_array(1) {
362                    pretty_format_columns("ColumnarValue(ScalarValue)", &[array])
363                } else {
364                    return write!(f, "Error formatting columnar value");
365                }
366            }
367        };
368
369        if let Ok(formatted) = formatted {
370            write!(f, "{formatted}")
371        } else {
372            write!(f, "Error formatting columnar value")
373        }
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use arrow::{
381        array::{Date64Array, Int32Array},
382        datatypes::TimeUnit,
383    };
384
385    #[test]
386    fn into_array_of_size() {
387        // Array case
388        let arr = make_array(1, 3);
389        let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
390        assert_eq!(&arr_columnar_value.into_array_of_size(3).unwrap(), &arr);
391
392        // Scalar case
393        let scalar_columnar_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(42)));
394        let expected_array = make_array(42, 100);
395        assert_eq!(
396            &scalar_columnar_value.into_array_of_size(100).unwrap(),
397            &expected_array
398        );
399
400        // Array case with wrong size
401        let arr = make_array(1, 3);
402        let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
403        let result = arr_columnar_value.into_array_of_size(5);
404        let err = result.unwrap_err();
405        assert!(
406            err.to_string().starts_with(
407                "Internal error: Array length 3 does not match expected length 5"
408            ),
409            "Found: {err}"
410        );
411    }
412
413    #[test]
414    fn values_to_arrays() {
415        // (input, expected)
416        let cases = vec![
417            // empty
418            TestCase {
419                input: vec![],
420                expected: vec![],
421            },
422            // one array of length 3
423            TestCase {
424                input: vec![ColumnarValue::Array(make_array(1, 3))],
425                expected: vec![make_array(1, 3)],
426            },
427            // two arrays length 3
428            TestCase {
429                input: vec![
430                    ColumnarValue::Array(make_array(1, 3)),
431                    ColumnarValue::Array(make_array(2, 3)),
432                ],
433                expected: vec![make_array(1, 3), make_array(2, 3)],
434            },
435            // array and scalar
436            TestCase {
437                input: vec![
438                    ColumnarValue::Array(make_array(1, 3)),
439                    ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
440                ],
441                expected: vec![
442                    make_array(1, 3),
443                    make_array(100, 3), // scalar is expanded
444                ],
445            },
446            // scalar and array
447            TestCase {
448                input: vec![
449                    ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
450                    ColumnarValue::Array(make_array(1, 3)),
451                ],
452                expected: vec![
453                    make_array(100, 3), // scalar is expanded
454                    make_array(1, 3),
455                ],
456            },
457            // multiple scalars and array
458            TestCase {
459                input: vec![
460                    ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
461                    ColumnarValue::Array(make_array(1, 3)),
462                    ColumnarValue::Scalar(ScalarValue::Int32(Some(200))),
463                ],
464                expected: vec![
465                    make_array(100, 3), // scalar is expanded
466                    make_array(1, 3),
467                    make_array(200, 3), // scalar is expanded
468                ],
469            },
470        ];
471        for case in cases {
472            case.run();
473        }
474    }
475
476    #[test]
477    #[should_panic(
478        expected = "Arguments has mixed length. Expected length: 3, found length: 4"
479    )]
480    fn values_to_arrays_mixed_length() {
481        ColumnarValue::values_to_arrays(&[
482            ColumnarValue::Array(make_array(1, 3)),
483            ColumnarValue::Array(make_array(2, 4)),
484        ])
485        .unwrap();
486    }
487
488    #[test]
489    #[should_panic(
490        expected = "Arguments has mixed length. Expected length: 3, found length: 7"
491    )]
492    fn values_to_arrays_mixed_length_and_scalar() {
493        ColumnarValue::values_to_arrays(&[
494            ColumnarValue::Array(make_array(1, 3)),
495            ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
496            ColumnarValue::Array(make_array(2, 7)),
497        ])
498        .unwrap();
499    }
500
501    struct TestCase {
502        input: Vec<ColumnarValue>,
503        expected: Vec<ArrayRef>,
504    }
505
506    impl TestCase {
507        fn run(self) {
508            let Self { input, expected } = self;
509
510            assert_eq!(
511                ColumnarValue::values_to_arrays(&input).unwrap(),
512                expected,
513                "\ninput: {input:?}\nexpected: {expected:?}"
514            );
515        }
516    }
517
518    /// Makes an array of length `len` with all elements set to `val`
519    fn make_array(val: i32, len: usize) -> ArrayRef {
520        Arc::new(Int32Array::from(vec![val; len]))
521    }
522
523    #[test]
524    fn test_display_scalar() {
525        let column = ColumnarValue::from(ScalarValue::from("foo"));
526        assert_eq!(
527            column.to_string(),
528            concat!(
529                "+----------------------------+\n",
530                "| ColumnarValue(ScalarValue) |\n",
531                "+----------------------------+\n",
532                "| foo                        |\n",
533                "+----------------------------+"
534            )
535        );
536    }
537
538    #[test]
539    fn test_display_array() {
540        let array: ArrayRef = Arc::new(Int32Array::from_iter_values(vec![1, 2, 3]));
541        let column = ColumnarValue::from(array);
542        assert_eq!(
543            column.to_string(),
544            concat!(
545                "+-------------------------+\n",
546                "| ColumnarValue(ArrayRef) |\n",
547                "+-------------------------+\n",
548                "| 1                       |\n",
549                "| 2                       |\n",
550                "| 3                       |\n",
551                "+-------------------------+"
552            )
553        );
554    }
555
556    #[test]
557    fn cast_date64_array_to_timestamp_overflow() {
558        let overflow_value = i64::MAX / 1_000_000 + 1;
559        let array: ArrayRef = Arc::new(Date64Array::from(vec![Some(overflow_value)]));
560        let value = ColumnarValue::Array(array);
561        let result =
562            value.cast_to(&DataType::Timestamp(TimeUnit::Nanosecond, None), None);
563        let err = result.expect_err("expected overflow to be detected");
564        assert!(
565            err.to_string()
566                .contains("converted value exceeds the representable i64 range"),
567            "unexpected error: {err}"
568        );
569    }
570}