Skip to main content

vortex_array/arrays/chunked/vtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::hash::Hash;
5
6use itertools::Itertools;
7use vortex_error::VortexResult;
8use vortex_error::vortex_bail;
9use vortex_error::vortex_ensure;
10use vortex_error::vortex_err;
11use vortex_error::vortex_panic;
12use vortex_session::VortexSession;
13
14use crate::ArrayRef;
15use crate::Canonical;
16use crate::EmptyMetadata;
17use crate::ExecutionCtx;
18use crate::IntoArray;
19use crate::Precision;
20use crate::ToCanonical;
21use crate::arrays::ChunkedArray;
22use crate::arrays::PrimitiveArray;
23use crate::arrays::chunked::compute::kernel::PARENT_KERNELS;
24use crate::arrays::chunked::compute::rules::PARENT_RULES;
25use crate::arrays::chunked::vtable::canonical::_canonicalize;
26use crate::buffer::BufferHandle;
27use crate::builders::ArrayBuilder;
28use crate::dtype::DType;
29use crate::dtype::Nullability;
30use crate::dtype::PType;
31use crate::hash::ArrayEq;
32use crate::hash::ArrayHash;
33use crate::serde::ArrayChildren;
34use crate::stats::StatsSetRef;
35use crate::validity::Validity;
36use crate::vtable;
37use crate::vtable::ArrayId;
38use crate::vtable::VTable;
39mod canonical;
40mod operations;
41mod validity;
42vtable!(Chunked);
43
44#[derive(Debug)]
45pub struct ChunkedVTable;
46
47impl ChunkedVTable {
48    pub const ID: ArrayId = ArrayId::new_ref("vortex.chunked");
49}
50
51impl VTable for ChunkedVTable {
52    type Array = ChunkedArray;
53
54    type Metadata = EmptyMetadata;
55    type OperationsVTable = Self;
56    type ValidityVTable = Self;
57    fn id(_array: &Self::Array) -> ArrayId {
58        Self::ID
59    }
60
61    fn len(array: &ChunkedArray) -> usize {
62        array.len
63    }
64
65    fn dtype(array: &ChunkedArray) -> &DType {
66        &array.dtype
67    }
68
69    fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
70        array.stats_set.to_ref(array.as_ref())
71    }
72
73    fn array_hash<H: std::hash::Hasher>(array: &ChunkedArray, state: &mut H, precision: Precision) {
74        array.dtype.hash(state);
75        array.len.hash(state);
76        array.chunk_offsets.as_ref().array_hash(state, precision);
77        for chunk in &array.chunks {
78            chunk.array_hash(state, precision);
79        }
80    }
81
82    fn array_eq(array: &ChunkedArray, other: &ChunkedArray, precision: Precision) -> bool {
83        array.dtype == other.dtype
84            && array.len == other.len
85            && array
86                .chunk_offsets
87                .as_ref()
88                .array_eq(other.chunk_offsets.as_ref(), precision)
89            && array.chunks.len() == other.chunks.len()
90            && array
91                .chunks
92                .iter()
93                .zip(&other.chunks)
94                .all(|(a, b)| a.array_eq(b, precision))
95    }
96
97    fn nbuffers(_array: &ChunkedArray) -> usize {
98        0
99    }
100
101    fn buffer(_array: &ChunkedArray, idx: usize) -> BufferHandle {
102        vortex_panic!("ChunkedArray buffer index {idx} out of bounds")
103    }
104
105    fn buffer_name(_array: &ChunkedArray, idx: usize) -> Option<String> {
106        vortex_panic!("ChunkedArray buffer_name index {idx} out of bounds")
107    }
108
109    fn nchildren(array: &ChunkedArray) -> usize {
110        1 + array.chunks().len()
111    }
112
113    fn child(array: &ChunkedArray, idx: usize) -> ArrayRef {
114        match idx {
115            0 => array.chunk_offsets.to_array(),
116            n => array.chunks()[n - 1].clone(),
117        }
118    }
119
120    fn child_name(_array: &ChunkedArray, idx: usize) -> String {
121        match idx {
122            0 => "chunk_offsets".to_string(),
123            n => format!("chunks[{}]", n - 1),
124        }
125    }
126
127    fn metadata(_array: &ChunkedArray) -> VortexResult<Self::Metadata> {
128        Ok(EmptyMetadata)
129    }
130
131    fn serialize(_metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
132        Ok(Some(vec![]))
133    }
134
135    fn deserialize(
136        _bytes: &[u8],
137        _dtype: &DType,
138        _len: usize,
139        _buffers: &[BufferHandle],
140        _session: &VortexSession,
141    ) -> VortexResult<Self::Metadata> {
142        Ok(EmptyMetadata)
143    }
144
145    fn build(
146        dtype: &DType,
147        _len: usize,
148        _metadata: &Self::Metadata,
149        _buffers: &[BufferHandle],
150        children: &dyn ArrayChildren,
151    ) -> VortexResult<ChunkedArray> {
152        if children.is_empty() {
153            vortex_bail!("Chunked array needs at least one child");
154        }
155
156        let nchunks = children.len() - 1;
157
158        // The first child contains the row offsets of the chunks
159        let chunk_offsets_array = children
160            .get(
161                0,
162                &DType::Primitive(PType::U64, Nullability::NonNullable),
163                // 1 extra offset for the end of the last chunk
164                nchunks + 1,
165            )?
166            .to_primitive();
167
168        let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
169
170        // The remaining children contain the actual data of the chunks
171        let chunks = chunk_offsets_buf
172            .iter()
173            .tuple_windows()
174            .enumerate()
175            .map(|(idx, (start, end))| {
176                let chunk_len = usize::try_from(end - start)
177                    .map_err(|_| vortex_err!("chunk_len {} exceeds usize range", end - start))?;
178                children.get(idx + 1, dtype, chunk_len)
179            })
180            .try_collect()?;
181
182        let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
183
184        let total_len = chunk_offsets_buf
185            .last()
186            .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
187        let len = usize::try_from(*total_len)
188            .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
189
190        // Construct directly using the struct fields to avoid recomputing chunk_offsets
191        Ok(ChunkedArray {
192            dtype: dtype.clone(),
193            len,
194            chunk_offsets,
195            chunks,
196            stats_set: Default::default(),
197        })
198    }
199
200    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
201        // Children: chunk_offsets, then chunks...
202        vortex_ensure!(
203            !children.is_empty(),
204            "Chunked array needs at least one child"
205        );
206
207        let nchunks = children.len() - 1;
208        let chunk_offsets_array = children[0].to_primitive();
209        let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
210
211        vortex_ensure!(
212            chunk_offsets_buf.len() == nchunks + 1,
213            "Expected {} chunk offsets, found {}",
214            nchunks + 1,
215            chunk_offsets_buf.len()
216        );
217
218        let chunks = children.into_iter().skip(1).collect();
219        array.chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
220        array.chunks = chunks;
221
222        let total_len = chunk_offsets_buf
223            .last()
224            .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
225        array.len = usize::try_from(*total_len)
226            .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
227
228        Ok(())
229    }
230
231    fn append_to_builder(
232        array: &ChunkedArray,
233        builder: &mut dyn ArrayBuilder,
234        ctx: &mut ExecutionCtx,
235    ) -> VortexResult<()> {
236        for chunk in array.chunks() {
237            chunk.append_to_builder(builder, ctx)?;
238        }
239        Ok(())
240    }
241
242    fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
243        Ok(_canonicalize(array, ctx)?.into_array())
244    }
245
246    fn reduce(array: &Self::Array) -> VortexResult<Option<ArrayRef>> {
247        Ok(match array.chunks.len() {
248            0 => Some(Canonical::empty(array.dtype()).into_array()),
249            1 => Some(array.chunks[0].clone()),
250            _ => None,
251        })
252    }
253
254    fn reduce_parent(
255        array: &Self::Array,
256        parent: &ArrayRef,
257        child_idx: usize,
258    ) -> VortexResult<Option<ArrayRef>> {
259        PARENT_RULES.evaluate(array, parent, child_idx)
260    }
261
262    fn execute_parent(
263        array: &Self::Array,
264        parent: &ArrayRef,
265        child_idx: usize,
266        ctx: &mut ExecutionCtx,
267    ) -> VortexResult<Option<ArrayRef>> {
268        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
269    }
270}