Skip to main content

vortex_array/arrays/chunked/vtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::hash::Hash;
5use std::sync::Arc;
6
7use itertools::Itertools;
8use vortex_error::VortexResult;
9use vortex_error::vortex_bail;
10use vortex_error::vortex_ensure;
11use vortex_error::vortex_err;
12use vortex_error::vortex_panic;
13use vortex_session::VortexSession;
14
15use crate::ArrayRef;
16use crate::Canonical;
17use crate::EmptyMetadata;
18use crate::ExecutionCtx;
19use crate::ExecutionResult;
20use crate::IntoArray;
21use crate::Precision;
22use crate::ToCanonical;
23use crate::arrays::ChunkedArray;
24use crate::arrays::PrimitiveArray;
25use crate::arrays::chunked::compute::kernel::PARENT_KERNELS;
26use crate::arrays::chunked::compute::rules::PARENT_RULES;
27use crate::arrays::chunked::vtable::canonical::_canonicalize;
28use crate::buffer::BufferHandle;
29use crate::builders::ArrayBuilder;
30use crate::dtype::DType;
31use crate::dtype::Nullability;
32use crate::dtype::PType;
33use crate::hash::ArrayEq;
34use crate::hash::ArrayHash;
35use crate::serde::ArrayChildren;
36use crate::stats::StatsSetRef;
37use crate::validity::Validity;
38use crate::vtable;
39use crate::vtable::Array;
40use crate::vtable::ArrayId;
41use crate::vtable::VTable;
42mod canonical;
43mod operations;
44mod validity;
45vtable!(Chunked);
46
47#[derive(Clone, Debug)]
48pub struct Chunked;
49
50impl Chunked {
51    pub const ID: ArrayId = ArrayId::new_ref("vortex.chunked");
52}
53
54impl VTable for Chunked {
55    type Array = ChunkedArray;
56
57    type Metadata = EmptyMetadata;
58    type OperationsVTable = Self;
59    type ValidityVTable = Self;
60    fn vtable(_array: &Self::Array) -> &Self {
61        &Chunked
62    }
63
64    fn id(&self) -> ArrayId {
65        Self::ID
66    }
67
68    fn len(array: &ChunkedArray) -> usize {
69        array.len
70    }
71
72    fn dtype(array: &ChunkedArray) -> &DType {
73        &array.dtype
74    }
75
76    fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
77        array.stats_set.to_ref(array.as_ref())
78    }
79
80    fn array_hash<H: std::hash::Hasher>(array: &ChunkedArray, state: &mut H, precision: Precision) {
81        array.dtype.hash(state);
82        array.len.hash(state);
83        array.chunk_offsets.as_ref().array_hash(state, precision);
84        for chunk in &array.chunks {
85            chunk.array_hash(state, precision);
86        }
87    }
88
89    fn array_eq(array: &ChunkedArray, other: &ChunkedArray, precision: Precision) -> bool {
90        array.dtype == other.dtype
91            && array.len == other.len
92            && array
93                .chunk_offsets
94                .as_ref()
95                .array_eq(other.chunk_offsets.as_ref(), precision)
96            && array.chunks.len() == other.chunks.len()
97            && array
98                .chunks
99                .iter()
100                .zip(&other.chunks)
101                .all(|(a, b)| a.array_eq(b, precision))
102    }
103
104    fn nbuffers(_array: &ChunkedArray) -> usize {
105        0
106    }
107
108    fn buffer(_array: &ChunkedArray, idx: usize) -> BufferHandle {
109        vortex_panic!("ChunkedArray buffer index {idx} out of bounds")
110    }
111
112    fn buffer_name(_array: &ChunkedArray, idx: usize) -> Option<String> {
113        vortex_panic!("ChunkedArray buffer_name index {idx} out of bounds")
114    }
115
116    fn nchildren(array: &ChunkedArray) -> usize {
117        1 + array.chunks().len()
118    }
119
120    fn child(array: &ChunkedArray, idx: usize) -> ArrayRef {
121        match idx {
122            0 => array.chunk_offsets.clone().into_array(),
123            n => array.chunks()[n - 1].clone(),
124        }
125    }
126
127    fn child_name(_array: &ChunkedArray, idx: usize) -> String {
128        match idx {
129            0 => "chunk_offsets".to_string(),
130            n => format!("chunks[{}]", n - 1),
131        }
132    }
133
134    fn metadata(_array: &ChunkedArray) -> VortexResult<Self::Metadata> {
135        Ok(EmptyMetadata)
136    }
137
138    fn serialize(_metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
139        Ok(Some(vec![]))
140    }
141
142    fn deserialize(
143        _bytes: &[u8],
144        _dtype: &DType,
145        _len: usize,
146        _buffers: &[BufferHandle],
147        _session: &VortexSession,
148    ) -> VortexResult<Self::Metadata> {
149        Ok(EmptyMetadata)
150    }
151
152    fn build(
153        dtype: &DType,
154        _len: usize,
155        _metadata: &Self::Metadata,
156        _buffers: &[BufferHandle],
157        children: &dyn ArrayChildren,
158    ) -> VortexResult<ChunkedArray> {
159        if children.is_empty() {
160            vortex_bail!("Chunked array needs at least one child");
161        }
162
163        let nchunks = children.len() - 1;
164
165        // The first child contains the row offsets of the chunks
166        let chunk_offsets_array = children
167            .get(
168                0,
169                &DType::Primitive(PType::U64, Nullability::NonNullable),
170                // 1 extra offset for the end of the last chunk
171                nchunks + 1,
172            )?
173            .to_primitive();
174
175        let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
176
177        // The remaining children contain the actual data of the chunks
178        let chunks = chunk_offsets_buf
179            .iter()
180            .tuple_windows()
181            .enumerate()
182            .map(|(idx, (start, end))| {
183                let chunk_len = usize::try_from(end - start)
184                    .map_err(|_| vortex_err!("chunk_len {} exceeds usize range", end - start))?;
185                children.get(idx + 1, dtype, chunk_len)
186            })
187            .try_collect()?;
188
189        let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
190
191        let total_len = chunk_offsets_buf
192            .last()
193            .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
194        let len = usize::try_from(*total_len)
195            .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
196
197        // Construct directly using the struct fields to avoid recomputing chunk_offsets
198        Ok(ChunkedArray {
199            dtype: dtype.clone(),
200            len,
201            chunk_offsets,
202            chunks,
203            stats_set: Default::default(),
204        })
205    }
206
207    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
208        // Children: chunk_offsets, then chunks...
209        vortex_ensure!(
210            !children.is_empty(),
211            "Chunked array needs at least one child"
212        );
213
214        let nchunks = children.len() - 1;
215        let chunk_offsets_array = children[0].to_primitive();
216        let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
217
218        vortex_ensure!(
219            chunk_offsets_buf.len() == nchunks + 1,
220            "Expected {} chunk offsets, found {}",
221            nchunks + 1,
222            chunk_offsets_buf.len()
223        );
224
225        let chunks = children.into_iter().skip(1).collect();
226        array.chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
227        array.chunks = chunks;
228
229        let total_len = chunk_offsets_buf
230            .last()
231            .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
232        array.len = usize::try_from(*total_len)
233            .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
234
235        Ok(())
236    }
237
238    fn append_to_builder(
239        array: &ChunkedArray,
240        builder: &mut dyn ArrayBuilder,
241        ctx: &mut ExecutionCtx,
242    ) -> VortexResult<()> {
243        for chunk in array.chunks() {
244            chunk.append_to_builder(builder, ctx)?;
245        }
246        Ok(())
247    }
248
249    fn execute(array: Arc<Array<Self>>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
250        Ok(ExecutionResult::done(
251            _canonicalize(&array, ctx)?.into_array(),
252        ))
253    }
254
255    fn reduce(array: &Array<Self>) -> VortexResult<Option<ArrayRef>> {
256        Ok(match array.chunks.len() {
257            0 => Some(Canonical::empty(array.dtype()).into_array()),
258            1 => Some(array.chunks[0].clone()),
259            _ => None,
260        })
261    }
262
263    fn reduce_parent(
264        array: &Array<Self>,
265        parent: &ArrayRef,
266        child_idx: usize,
267    ) -> VortexResult<Option<ArrayRef>> {
268        PARENT_RULES.evaluate(array, parent, child_idx)
269    }
270
271    fn execute_parent(
272        array: &Array<Self>,
273        parent: &ArrayRef,
274        child_idx: usize,
275        ctx: &mut ExecutionCtx,
276    ) -> VortexResult<Option<ArrayRef>> {
277        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
278    }
279}