Skip to main content

vortex_array/arrow/
datum.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use arrow_array::Array as ArrowArray;
5use arrow_array::ArrayRef as ArrowArrayRef;
6use arrow_array::Datum as ArrowDatum;
7use arrow_schema::DataType;
8use arrow_schema::Field;
9use vortex_error::VortexExpect;
10use vortex_error::VortexResult;
11use vortex_error::vortex_panic;
12
13use crate::ArrayRef;
14use crate::IntoArray;
15use crate::LEGACY_SESSION;
16use crate::VortexSessionExecute;
17use crate::arrays::Constant;
18use crate::arrays::ConstantArray;
19use crate::arrow::ArrowSessionExt;
20use crate::arrow::FromArrowArray;
21use crate::executor::ExecutionCtx;
22
23/// A wrapper around a generic Arrow array that can be used as a Datum in Arrow compute.
24#[derive(Debug)]
25pub struct Datum {
26    array: ArrowArrayRef,
27    is_scalar: bool,
28}
29
30impl Datum {
31    /// Create a new [`Datum`] from an [`ArrayRef`], which can then be passed to Arrow compute.
32    pub fn try_new(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
33        let session = ctx.session().clone();
34        if array.is::<Constant>() {
35            Ok(Self {
36                array: session
37                    .arrow()
38                    .execute_arrow(array.slice(0..1)?, None, ctx)?,
39                is_scalar: true,
40            })
41        } else {
42            Ok(Self {
43                array: session.arrow().execute_arrow(array.clone(), None, ctx)?,
44                is_scalar: false,
45            })
46        }
47    }
48
49    /// Create a new [`Datum`] from an `DynArrayData`, which can then be passed to Arrow compute.
50    /// This not try and convert the array to a scalar if it is constant.
51    pub fn try_new_array(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
52        let session = ctx.session().clone();
53        Ok(Self {
54            array: session.arrow().execute_arrow(array.clone(), None, ctx)?,
55            is_scalar: false,
56        })
57    }
58
59    pub fn try_new_with_target_datatype(
60        array: &ArrayRef,
61        target_datatype: &DataType,
62        ctx: &mut ExecutionCtx,
63    ) -> VortexResult<Self> {
64        let session = ctx.session().clone();
65        let target_field = Field::new(
66            String::new(),
67            target_datatype.clone(),
68            array.dtype().is_nullable(),
69        );
70
71        if array.is::<Constant>() {
72            Ok(Self {
73                array: session.arrow().execute_arrow(
74                    array.slice(0..1)?,
75                    Some(&target_field),
76                    ctx,
77                )?,
78                is_scalar: true,
79            })
80        } else {
81            Ok(Self {
82                array: session
83                    .arrow()
84                    .execute_arrow(array.clone(), Some(&target_field), ctx)?,
85                is_scalar: false,
86            })
87        }
88    }
89
90    pub fn data_type(&self) -> &DataType {
91        self.array.data_type()
92    }
93}
94
95impl ArrowDatum for Datum {
96    fn get(&self) -> (&dyn ArrowArray, bool) {
97        (&self.array, self.is_scalar)
98    }
99}
100
101/// Convert an Arrow array to an Array with a specific length.
102/// This is useful for compute functions that delegate to Arrow using [Datum],
103/// which will return a scalar (length 1 Arrow array) if the input array is constant.
104///
105/// # Error
106///
107/// The provided array must have length
108pub fn from_arrow_array_with_len<A>(array: A, len: usize, nullable: bool) -> VortexResult<ArrayRef>
109where
110    ArrayRef: FromArrowArray<A>,
111{
112    let array = ArrayRef::from_arrow(array, nullable)?;
113    if array.len() == len {
114        return Ok(array);
115    }
116
117    if array.len() != 1 {
118        vortex_panic!(
119            "Array length mismatch, expected {} got {} for encoding {}",
120            len,
121            array.len(),
122            array.encoding_id()
123        );
124    }
125
126    Ok(ConstantArray::new(
127        array
128            .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())
129            .vortex_expect("array of length 1 must support execute_scalar(0)"),
130        len,
131    )
132    .into_array())
133}