Skip to main content

vortex_array/arrays/chunked/vtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::hash::Hash;
5
6use itertools::Itertools;
7use vortex_error::VortexResult;
8use vortex_error::vortex_bail;
9use vortex_error::vortex_ensure;
10use vortex_error::vortex_err;
11use vortex_error::vortex_panic;
12use vortex_session::VortexSession;
13
14use crate::ArrayRef;
15use crate::Canonical;
16use crate::EmptyMetadata;
17use crate::ExecutionCtx;
18use crate::ExecutionStep;
19use crate::IntoArray;
20use crate::Precision;
21use crate::ToCanonical;
22use crate::arrays::ChunkedArray;
23use crate::arrays::PrimitiveArray;
24use crate::arrays::chunked::compute::kernel::PARENT_KERNELS;
25use crate::arrays::chunked::compute::rules::PARENT_RULES;
26use crate::arrays::chunked::vtable::canonical::_canonicalize;
27use crate::buffer::BufferHandle;
28use crate::builders::ArrayBuilder;
29use crate::dtype::DType;
30use crate::dtype::Nullability;
31use crate::dtype::PType;
32use crate::hash::ArrayEq;
33use crate::hash::ArrayHash;
34use crate::serde::ArrayChildren;
35use crate::stats::StatsSetRef;
36use crate::validity::Validity;
37use crate::vtable;
38use crate::vtable::ArrayId;
39use crate::vtable::VTable;
40mod canonical;
41mod operations;
42mod validity;
43vtable!(Chunked);
44
45#[derive(Debug)]
46pub struct ChunkedVTable;
47
48impl ChunkedVTable {
49    pub const ID: ArrayId = ArrayId::new_ref("vortex.chunked");
50}
51
52impl VTable for ChunkedVTable {
53    type Array = ChunkedArray;
54
55    type Metadata = EmptyMetadata;
56    type OperationsVTable = Self;
57    type ValidityVTable = Self;
58    fn id(_array: &Self::Array) -> ArrayId {
59        Self::ID
60    }
61
62    fn len(array: &ChunkedArray) -> usize {
63        array.len
64    }
65
66    fn dtype(array: &ChunkedArray) -> &DType {
67        &array.dtype
68    }
69
70    fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
71        array.stats_set.to_ref(array.as_ref())
72    }
73
74    fn array_hash<H: std::hash::Hasher>(array: &ChunkedArray, state: &mut H, precision: Precision) {
75        array.dtype.hash(state);
76        array.len.hash(state);
77        array.chunk_offsets.as_ref().array_hash(state, precision);
78        for chunk in &array.chunks {
79            chunk.array_hash(state, precision);
80        }
81    }
82
83    fn array_eq(array: &ChunkedArray, other: &ChunkedArray, precision: Precision) -> bool {
84        array.dtype == other.dtype
85            && array.len == other.len
86            && array
87                .chunk_offsets
88                .as_ref()
89                .array_eq(other.chunk_offsets.as_ref(), precision)
90            && array.chunks.len() == other.chunks.len()
91            && array
92                .chunks
93                .iter()
94                .zip(&other.chunks)
95                .all(|(a, b)| a.array_eq(b, precision))
96    }
97
98    fn nbuffers(_array: &ChunkedArray) -> usize {
99        0
100    }
101
102    fn buffer(_array: &ChunkedArray, idx: usize) -> BufferHandle {
103        vortex_panic!("ChunkedArray buffer index {idx} out of bounds")
104    }
105
106    fn buffer_name(_array: &ChunkedArray, idx: usize) -> Option<String> {
107        vortex_panic!("ChunkedArray buffer_name index {idx} out of bounds")
108    }
109
110    fn nchildren(array: &ChunkedArray) -> usize {
111        1 + array.chunks().len()
112    }
113
114    fn child(array: &ChunkedArray, idx: usize) -> ArrayRef {
115        match idx {
116            0 => array.chunk_offsets.clone().into_array(),
117            n => array.chunks()[n - 1].clone(),
118        }
119    }
120
121    fn child_name(_array: &ChunkedArray, idx: usize) -> String {
122        match idx {
123            0 => "chunk_offsets".to_string(),
124            n => format!("chunks[{}]", n - 1),
125        }
126    }
127
128    fn metadata(_array: &ChunkedArray) -> VortexResult<Self::Metadata> {
129        Ok(EmptyMetadata)
130    }
131
132    fn serialize(_metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
133        Ok(Some(vec![]))
134    }
135
136    fn deserialize(
137        _bytes: &[u8],
138        _dtype: &DType,
139        _len: usize,
140        _buffers: &[BufferHandle],
141        _session: &VortexSession,
142    ) -> VortexResult<Self::Metadata> {
143        Ok(EmptyMetadata)
144    }
145
146    fn build(
147        dtype: &DType,
148        _len: usize,
149        _metadata: &Self::Metadata,
150        _buffers: &[BufferHandle],
151        children: &dyn ArrayChildren,
152    ) -> VortexResult<ChunkedArray> {
153        if children.is_empty() {
154            vortex_bail!("Chunked array needs at least one child");
155        }
156
157        let nchunks = children.len() - 1;
158
159        // The first child contains the row offsets of the chunks
160        let chunk_offsets_array = children
161            .get(
162                0,
163                &DType::Primitive(PType::U64, Nullability::NonNullable),
164                // 1 extra offset for the end of the last chunk
165                nchunks + 1,
166            )?
167            .to_primitive();
168
169        let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
170
171        // The remaining children contain the actual data of the chunks
172        let chunks = chunk_offsets_buf
173            .iter()
174            .tuple_windows()
175            .enumerate()
176            .map(|(idx, (start, end))| {
177                let chunk_len = usize::try_from(end - start)
178                    .map_err(|_| vortex_err!("chunk_len {} exceeds usize range", end - start))?;
179                children.get(idx + 1, dtype, chunk_len)
180            })
181            .try_collect()?;
182
183        let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
184
185        let total_len = chunk_offsets_buf
186            .last()
187            .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
188        let len = usize::try_from(*total_len)
189            .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
190
191        // Construct directly using the struct fields to avoid recomputing chunk_offsets
192        Ok(ChunkedArray {
193            dtype: dtype.clone(),
194            len,
195            chunk_offsets,
196            chunks,
197            stats_set: Default::default(),
198        })
199    }
200
201    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
202        // Children: chunk_offsets, then chunks...
203        vortex_ensure!(
204            !children.is_empty(),
205            "Chunked array needs at least one child"
206        );
207
208        let nchunks = children.len() - 1;
209        let chunk_offsets_array = children[0].to_primitive();
210        let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
211
212        vortex_ensure!(
213            chunk_offsets_buf.len() == nchunks + 1,
214            "Expected {} chunk offsets, found {}",
215            nchunks + 1,
216            chunk_offsets_buf.len()
217        );
218
219        let chunks = children.into_iter().skip(1).collect();
220        array.chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
221        array.chunks = chunks;
222
223        let total_len = chunk_offsets_buf
224            .last()
225            .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
226        array.len = usize::try_from(*total_len)
227            .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
228
229        Ok(())
230    }
231
232    fn append_to_builder(
233        array: &ChunkedArray,
234        builder: &mut dyn ArrayBuilder,
235        ctx: &mut ExecutionCtx,
236    ) -> VortexResult<()> {
237        for chunk in array.chunks() {
238            chunk.append_to_builder(builder, ctx)?;
239        }
240        Ok(())
241    }
242
243    fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
244        Ok(ExecutionStep::Done(_canonicalize(array, ctx)?.into_array()))
245    }
246
247    fn reduce(array: &Self::Array) -> VortexResult<Option<ArrayRef>> {
248        Ok(match array.chunks.len() {
249            0 => Some(Canonical::empty(array.dtype()).into_array()),
250            1 => Some(array.chunks[0].clone()),
251            _ => None,
252        })
253    }
254
255    fn reduce_parent(
256        array: &Self::Array,
257        parent: &ArrayRef,
258        child_idx: usize,
259    ) -> VortexResult<Option<ArrayRef>> {
260        PARENT_RULES.evaluate(array, parent, child_idx)
261    }
262
263    fn execute_parent(
264        array: &Self::Array,
265        parent: &ArrayRef,
266        child_idx: usize,
267        ctx: &mut ExecutionCtx,
268    ) -> VortexResult<Option<ArrayRef>> {
269        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
270    }
271}