datafusion_expr_common/columnar_value.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ColumnarValue`] represents the result of evaluating an expression.
19
20use arrow::{
21 array::{Array, ArrayRef, Date32Array, Date64Array, NullArray},
22 compute::{CastOptions, kernels, max, min},
23 datatypes::DataType,
24 util::pretty::pretty_format_columns,
25};
26use datafusion_common::internal_datafusion_err;
27use datafusion_common::{
28 Result, ScalarValue,
29 format::DEFAULT_CAST_OPTIONS,
30 internal_err,
31 scalar::{date_to_timestamp_multiplier, ensure_timestamp_in_bounds},
32};
33use std::fmt;
34use std::sync::Arc;
35
36/// The result of evaluating an expression.
37///
38/// [`ColumnarValue::Scalar`] represents a single value repeated any number of
39/// times. This is an important performance optimization for handling values
40/// that do not change across rows.
41///
42/// [`ColumnarValue::Array`] represents a column of data, stored as an Arrow
43/// [`ArrayRef`]
44///
45/// A slice of `ColumnarValue`s logically represents a table, with each column
46/// having the same number of rows. This means that all `Array`s are the same
47/// length.
48///
49/// # Example
50///
51/// A `ColumnarValue::Array` with an array of 5 elements and a
52/// `ColumnarValue::Scalar` with the value 100
53///
54/// ```text
55/// ┌──────────────┐
56/// │ ┌──────────┐ │
57/// │ │ "A" │ │
58/// │ ├──────────┤ │
59/// │ │ "B" │ │
60/// │ ├──────────┤ │
61/// │ │ "C" │ │
62/// │ ├──────────┤ │
63/// │ │ "D" │ │ ┌──────────────┐
64/// │ ├──────────┤ │ │ ┌──────────┐ │
65/// │ │ "E" │ │ │ │ 100 │ │
66/// │ └──────────┘ │ │ └──────────┘ │
67/// └──────────────┘ └──────────────┘
68///
69/// ColumnarValue:: ColumnarValue::
70/// Array Scalar
71/// ```
72///
73/// Logically represents the following table:
74///
75/// | Column 1| Column 2 |
76/// | ------- | -------- |
77/// | A | 100 |
78/// | B | 100 |
79/// | C | 100 |
80/// | D | 100 |
81/// | E | 100 |
82///
83/// # Performance Notes
84///
85/// When implementing functions or operators, it is important to consider the
86/// performance implications of handling scalar values.
87///
88/// Because all functions must handle [`ArrayRef`], it is
89/// convenient to convert [`ColumnarValue::Scalar`]s using
90/// [`Self::into_array`]. For example, [`ColumnarValue::values_to_arrays`]
91/// converts multiple columnar values into arrays of the same length.
92///
93/// However, it is often much more performant to provide a different,
94/// implementation that handles scalar values differently
95#[derive(Clone, Debug)]
96pub enum ColumnarValue {
97 /// Array of values
98 Array(ArrayRef),
99 /// A single value
100 Scalar(ScalarValue),
101}
102
103impl From<ArrayRef> for ColumnarValue {
104 fn from(value: ArrayRef) -> Self {
105 ColumnarValue::Array(value)
106 }
107}
108
109impl From<ScalarValue> for ColumnarValue {
110 fn from(value: ScalarValue) -> Self {
111 ColumnarValue::Scalar(value)
112 }
113}
114
115impl ColumnarValue {
116 pub fn data_type(&self) -> DataType {
117 match self {
118 ColumnarValue::Array(array_value) => array_value.data_type().clone(),
119 ColumnarValue::Scalar(scalar_value) => scalar_value.data_type(),
120 }
121 }
122
123 /// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
124 /// number of rows by repeating the same scalar multiple times,
125 /// which is not as efficient as handling the scalar directly.
126 /// [`Self::Array`] will just be returned as is.
127 ///
128 /// See [`Self::into_array_of_size`] if you need to validate the length of the output array.
129 ///
130 /// See [`Self::values_to_arrays`] to convert multiple columnar values into
131 /// arrays of the same length.
132 ///
133 /// # Errors
134 ///
135 /// Errors if `self` is a Scalar that fails to be converted into an array of size
136 pub fn into_array(self, num_rows: usize) -> Result<ArrayRef> {
137 Ok(match self {
138 ColumnarValue::Array(array) => array,
139 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
140 })
141 }
142
143 /// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
144 /// number of rows. [`Self::Scalar`] is converted by repeating the same
145 /// scalar multiple times which is not as efficient as handling the scalar
146 /// directly.
147 /// This validates that if this is [`Self::Array`], it has the expected length.
148 ///
149 /// See [`Self::values_to_arrays`] to convert multiple columnar values into
150 /// arrays of the same length.
151 ///
152 /// # Errors
153 ///
154 /// Errors if `self` is a Scalar that fails to be converted into an array of size or
155 /// if the array length does not match the expected length
156 pub fn into_array_of_size(self, num_rows: usize) -> Result<ArrayRef> {
157 match self {
158 ColumnarValue::Array(array) => {
159 if array.len() == num_rows {
160 Ok(array)
161 } else {
162 internal_err!(
163 "Array length {} does not match expected length {}",
164 array.len(),
165 num_rows
166 )
167 }
168 }
169 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
170 }
171 }
172
173 /// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
174 /// number of rows by repeating the same scalar multiple times,
175 /// which is not as efficient as handling the scalar directly.
176 /// [`Self::Array`] will just be returned as is.
177 ///
178 /// See [`Self::to_array_of_size`] if you need to validate the length of the output array.
179 ///
180 /// See [`Self::values_to_arrays`] to convert multiple columnar values into
181 /// arrays of the same length.
182 ///
183 /// # Errors
184 ///
185 /// Errors if `self` is a Scalar that fails to be converted into an array of size
186 pub fn to_array(&self, num_rows: usize) -> Result<ArrayRef> {
187 Ok(match self {
188 ColumnarValue::Array(array) => Arc::clone(array),
189 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
190 })
191 }
192
193 /// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
194 /// number of rows. [`Self::Scalar`] is converted by repeating the same
195 /// scalar multiple times which is not as efficient as handling the scalar
196 /// directly.
197 /// This validates that if this is [`Self::Array`], it has the expected length.
198 ///
199 /// See [`Self::values_to_arrays`] to convert multiple columnar values into
200 /// arrays of the same length.
201 ///
202 /// # Errors
203 ///
204 /// Errors if `self` is a Scalar that fails to be converted into an array of size or
205 /// if the array length does not match the expected length
206 pub fn to_array_of_size(&self, num_rows: usize) -> Result<ArrayRef> {
207 match self {
208 ColumnarValue::Array(array) => {
209 if array.len() == num_rows {
210 Ok(Arc::clone(array))
211 } else {
212 internal_err!(
213 "Array length {} does not match expected length {}",
214 array.len(),
215 num_rows
216 )
217 }
218 }
219 ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
220 }
221 }
222
223 /// Null columnar values are implemented as a null array in order to pass batch
224 /// num_rows
225 pub fn create_null_array(num_rows: usize) -> Self {
226 ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
227 }
228
229 /// Converts [`ColumnarValue`]s to [`ArrayRef`]s with the same length.
230 ///
231 /// # Performance Note
232 ///
233 /// This function expands any [`ScalarValue`] to an array. This expansion
234 /// permits using a single function in terms of arrays, but it can be
235 /// inefficient compared to handling the scalar value directly.
236 ///
237 /// Thus, it is recommended to provide specialized implementations for
238 /// scalar values if performance is a concern.
239 ///
240 /// # Errors
241 ///
242 /// If there are multiple array arguments that have different lengths
243 pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
244 if args.is_empty() {
245 return Ok(vec![]);
246 }
247
248 let mut array_len = None;
249 for arg in args {
250 array_len = match (arg, array_len) {
251 (ColumnarValue::Array(a), None) => Some(a.len()),
252 (ColumnarValue::Array(a), Some(array_len)) => {
253 if array_len == a.len() {
254 Some(array_len)
255 } else {
256 return internal_err!(
257 "Arguments has mixed length. Expected length: {array_len}, found length: {}",
258 a.len()
259 );
260 }
261 }
262 (ColumnarValue::Scalar(_), array_len) => array_len,
263 }
264 }
265
266 // If array_len is none, it means there are only scalars, so make a 1 element array
267 let inferred_length = array_len.unwrap_or(1);
268
269 let args = args
270 .iter()
271 .map(|arg| arg.to_array(inferred_length))
272 .collect::<Result<Vec<_>>>()?;
273
274 Ok(args)
275 }
276
277 /// Cast's this [ColumnarValue] to the specified `DataType`
278 pub fn cast_to(
279 &self,
280 cast_type: &DataType,
281 cast_options: Option<&CastOptions<'static>>,
282 ) -> Result<ColumnarValue> {
283 let cast_options = cast_options.cloned().unwrap_or(DEFAULT_CAST_OPTIONS);
284 match self {
285 ColumnarValue::Array(array) => {
286 ensure_date_array_timestamp_bounds(array, cast_type)?;
287 Ok(ColumnarValue::Array(kernels::cast::cast_with_options(
288 array,
289 cast_type,
290 &cast_options,
291 )?))
292 }
293 ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
294 scalar.cast_to_with_options(cast_type, &cast_options)?,
295 )),
296 }
297 }
298}
299
300fn ensure_date_array_timestamp_bounds(
301 array: &ArrayRef,
302 cast_type: &DataType,
303) -> Result<()> {
304 let source_type = array.data_type().clone();
305 let Some(multiplier) = date_to_timestamp_multiplier(&source_type, cast_type) else {
306 return Ok(());
307 };
308
309 if multiplier <= 1 {
310 return Ok(());
311 }
312
313 // Use compute kernels to find min/max instead of iterating all elements
314 let (min_val, max_val): (Option<i64>, Option<i64>) = match &source_type {
315 DataType::Date32 => {
316 let arr = array
317 .as_any()
318 .downcast_ref::<Date32Array>()
319 .ok_or_else(|| {
320 internal_datafusion_err!(
321 "Expected Date32Array but found {}",
322 array.data_type()
323 )
324 })?;
325 (min(arr).map(|v| v as i64), max(arr).map(|v| v as i64))
326 }
327 DataType::Date64 => {
328 let arr = array
329 .as_any()
330 .downcast_ref::<Date64Array>()
331 .ok_or_else(|| {
332 internal_datafusion_err!(
333 "Expected Date64Array but found {}",
334 array.data_type()
335 )
336 })?;
337 (min(arr), max(arr))
338 }
339 _ => return Ok(()), // Not a date type, nothing to do
340 };
341
342 // Only validate the min and max values instead of all elements
343 if let Some(min) = min_val {
344 ensure_timestamp_in_bounds(min, multiplier, &source_type, cast_type)?;
345 }
346 if let Some(max) = max_val {
347 ensure_timestamp_in_bounds(max, multiplier, &source_type, cast_type)?;
348 }
349
350 Ok(())
351}
352
353// Implement Display trait for ColumnarValue
354impl fmt::Display for ColumnarValue {
355 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
356 let formatted = match self {
357 ColumnarValue::Array(array) => {
358 pretty_format_columns("ColumnarValue(ArrayRef)", &[Arc::clone(array)])
359 }
360 ColumnarValue::Scalar(_) => {
361 if let Ok(array) = self.to_array(1) {
362 pretty_format_columns("ColumnarValue(ScalarValue)", &[array])
363 } else {
364 return write!(f, "Error formatting columnar value");
365 }
366 }
367 };
368
369 if let Ok(formatted) = formatted {
370 write!(f, "{formatted}")
371 } else {
372 write!(f, "Error formatting columnar value")
373 }
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380 use arrow::{
381 array::{Date64Array, Int32Array},
382 datatypes::TimeUnit,
383 };
384
385 #[test]
386 fn into_array_of_size() {
387 // Array case
388 let arr = make_array(1, 3);
389 let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
390 assert_eq!(&arr_columnar_value.into_array_of_size(3).unwrap(), &arr);
391
392 // Scalar case
393 let scalar_columnar_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(42)));
394 let expected_array = make_array(42, 100);
395 assert_eq!(
396 &scalar_columnar_value.into_array_of_size(100).unwrap(),
397 &expected_array
398 );
399
400 // Array case with wrong size
401 let arr = make_array(1, 3);
402 let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
403 let result = arr_columnar_value.into_array_of_size(5);
404 let err = result.unwrap_err();
405 assert!(
406 err.to_string().starts_with(
407 "Internal error: Array length 3 does not match expected length 5"
408 ),
409 "Found: {err}"
410 );
411 }
412
413 #[test]
414 fn values_to_arrays() {
415 // (input, expected)
416 let cases = vec![
417 // empty
418 TestCase {
419 input: vec![],
420 expected: vec![],
421 },
422 // one array of length 3
423 TestCase {
424 input: vec![ColumnarValue::Array(make_array(1, 3))],
425 expected: vec![make_array(1, 3)],
426 },
427 // two arrays length 3
428 TestCase {
429 input: vec![
430 ColumnarValue::Array(make_array(1, 3)),
431 ColumnarValue::Array(make_array(2, 3)),
432 ],
433 expected: vec![make_array(1, 3), make_array(2, 3)],
434 },
435 // array and scalar
436 TestCase {
437 input: vec![
438 ColumnarValue::Array(make_array(1, 3)),
439 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
440 ],
441 expected: vec![
442 make_array(1, 3),
443 make_array(100, 3), // scalar is expanded
444 ],
445 },
446 // scalar and array
447 TestCase {
448 input: vec![
449 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
450 ColumnarValue::Array(make_array(1, 3)),
451 ],
452 expected: vec![
453 make_array(100, 3), // scalar is expanded
454 make_array(1, 3),
455 ],
456 },
457 // multiple scalars and array
458 TestCase {
459 input: vec![
460 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
461 ColumnarValue::Array(make_array(1, 3)),
462 ColumnarValue::Scalar(ScalarValue::Int32(Some(200))),
463 ],
464 expected: vec![
465 make_array(100, 3), // scalar is expanded
466 make_array(1, 3),
467 make_array(200, 3), // scalar is expanded
468 ],
469 },
470 ];
471 for case in cases {
472 case.run();
473 }
474 }
475
476 #[test]
477 #[should_panic(
478 expected = "Arguments has mixed length. Expected length: 3, found length: 4"
479 )]
480 fn values_to_arrays_mixed_length() {
481 ColumnarValue::values_to_arrays(&[
482 ColumnarValue::Array(make_array(1, 3)),
483 ColumnarValue::Array(make_array(2, 4)),
484 ])
485 .unwrap();
486 }
487
488 #[test]
489 #[should_panic(
490 expected = "Arguments has mixed length. Expected length: 3, found length: 7"
491 )]
492 fn values_to_arrays_mixed_length_and_scalar() {
493 ColumnarValue::values_to_arrays(&[
494 ColumnarValue::Array(make_array(1, 3)),
495 ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
496 ColumnarValue::Array(make_array(2, 7)),
497 ])
498 .unwrap();
499 }
500
501 struct TestCase {
502 input: Vec<ColumnarValue>,
503 expected: Vec<ArrayRef>,
504 }
505
506 impl TestCase {
507 fn run(self) {
508 let Self { input, expected } = self;
509
510 assert_eq!(
511 ColumnarValue::values_to_arrays(&input).unwrap(),
512 expected,
513 "\ninput: {input:?}\nexpected: {expected:?}"
514 );
515 }
516 }
517
518 /// Makes an array of length `len` with all elements set to `val`
519 fn make_array(val: i32, len: usize) -> ArrayRef {
520 Arc::new(Int32Array::from(vec![val; len]))
521 }
522
523 #[test]
524 fn test_display_scalar() {
525 let column = ColumnarValue::from(ScalarValue::from("foo"));
526 assert_eq!(
527 column.to_string(),
528 concat!(
529 "+----------------------------+\n",
530 "| ColumnarValue(ScalarValue) |\n",
531 "+----------------------------+\n",
532 "| foo |\n",
533 "+----------------------------+"
534 )
535 );
536 }
537
538 #[test]
539 fn test_display_array() {
540 let array: ArrayRef = Arc::new(Int32Array::from_iter_values(vec![1, 2, 3]));
541 let column = ColumnarValue::from(array);
542 assert_eq!(
543 column.to_string(),
544 concat!(
545 "+-------------------------+\n",
546 "| ColumnarValue(ArrayRef) |\n",
547 "+-------------------------+\n",
548 "| 1 |\n",
549 "| 2 |\n",
550 "| 3 |\n",
551 "+-------------------------+"
552 )
553 );
554 }
555
556 #[test]
557 fn cast_date64_array_to_timestamp_overflow() {
558 let overflow_value = i64::MAX / 1_000_000 + 1;
559 let array: ArrayRef = Arc::new(Date64Array::from(vec![Some(overflow_value)]));
560 let value = ColumnarValue::Array(array);
561 let result =
562 value.cast_to(&DataType::Timestamp(TimeUnit::Nanosecond, None), None);
563 let err = result.expect_err("expected overflow to be detected");
564 assert!(
565 err.to_string()
566 .contains("converted value exceeds the representable i64 range"),
567 "unexpected error: {err}"
568 );
569 }
570}