Skip to main content

vortex_array/arrays/chunked/vtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::hash::Hash;
5use std::sync::Arc;
6
7use itertools::Itertools;
8use vortex_error::VortexResult;
9use vortex_error::vortex_bail;
10use vortex_error::vortex_ensure;
11use vortex_error::vortex_err;
12use vortex_error::vortex_panic;
13use vortex_session::VortexSession;
14
15use crate::ArrayRef;
16use crate::Canonical;
17use crate::EmptyMetadata;
18use crate::ExecutionCtx;
19use crate::ExecutionResult;
20use crate::IntoArray;
21use crate::Precision;
22use crate::ToCanonical;
23use crate::arrays::ChunkedArray;
24use crate::arrays::PrimitiveArray;
25use crate::arrays::chunked::compute::kernel::PARENT_KERNELS;
26use crate::arrays::chunked::compute::rules::PARENT_RULES;
27use crate::arrays::chunked::vtable::canonical::_canonicalize;
28use crate::buffer::BufferHandle;
29use crate::builders::ArrayBuilder;
30use crate::dtype::DType;
31use crate::dtype::Nullability;
32use crate::dtype::PType;
33use crate::hash::ArrayEq;
34use crate::hash::ArrayHash;
35use crate::serde::ArrayChildren;
36use crate::stats::StatsSetRef;
37use crate::validity::Validity;
38use crate::vtable;
39use crate::vtable::ArrayId;
40use crate::vtable::VTable;
41mod canonical;
42mod operations;
43mod validity;
44vtable!(Chunked);
45
46#[derive(Clone, Debug)]
47pub struct Chunked;
48
49impl Chunked {
50    pub const ID: ArrayId = ArrayId::new_ref("vortex.chunked");
51}
52
53impl VTable for Chunked {
54    type Array = ChunkedArray;
55
56    type Metadata = EmptyMetadata;
57    type OperationsVTable = Self;
58    type ValidityVTable = Self;
59    fn vtable(_array: &Self::Array) -> &Self {
60        &Chunked
61    }
62
63    fn id(&self) -> ArrayId {
64        Self::ID
65    }
66
67    fn len(array: &ChunkedArray) -> usize {
68        array.len
69    }
70
71    fn dtype(array: &ChunkedArray) -> &DType {
72        &array.dtype
73    }
74
75    fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
76        array.stats_set.to_ref(array.as_ref())
77    }
78
79    fn array_hash<H: std::hash::Hasher>(array: &ChunkedArray, state: &mut H, precision: Precision) {
80        array.dtype.hash(state);
81        array.len.hash(state);
82        array.chunk_offsets.as_ref().array_hash(state, precision);
83        for chunk in &array.chunks {
84            chunk.array_hash(state, precision);
85        }
86    }
87
88    fn array_eq(array: &ChunkedArray, other: &ChunkedArray, precision: Precision) -> bool {
89        array.dtype == other.dtype
90            && array.len == other.len
91            && array
92                .chunk_offsets
93                .as_ref()
94                .array_eq(other.chunk_offsets.as_ref(), precision)
95            && array.chunks.len() == other.chunks.len()
96            && array
97                .chunks
98                .iter()
99                .zip(&other.chunks)
100                .all(|(a, b)| a.array_eq(b, precision))
101    }
102
103    fn nbuffers(_array: &ChunkedArray) -> usize {
104        0
105    }
106
107    fn buffer(_array: &ChunkedArray, idx: usize) -> BufferHandle {
108        vortex_panic!("ChunkedArray buffer index {idx} out of bounds")
109    }
110
111    fn buffer_name(_array: &ChunkedArray, idx: usize) -> Option<String> {
112        vortex_panic!("ChunkedArray buffer_name index {idx} out of bounds")
113    }
114
115    fn nchildren(array: &ChunkedArray) -> usize {
116        1 + array.chunks().len()
117    }
118
119    fn child(array: &ChunkedArray, idx: usize) -> ArrayRef {
120        match idx {
121            0 => array.chunk_offsets.clone().into_array(),
122            n => array.chunks()[n - 1].clone(),
123        }
124    }
125
126    fn child_name(_array: &ChunkedArray, idx: usize) -> String {
127        match idx {
128            0 => "chunk_offsets".to_string(),
129            n => format!("chunks[{}]", n - 1),
130        }
131    }
132
133    fn metadata(_array: &ChunkedArray) -> VortexResult<Self::Metadata> {
134        Ok(EmptyMetadata)
135    }
136
137    fn serialize(_metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
138        Ok(Some(vec![]))
139    }
140
141    fn deserialize(
142        _bytes: &[u8],
143        _dtype: &DType,
144        _len: usize,
145        _buffers: &[BufferHandle],
146        _session: &VortexSession,
147    ) -> VortexResult<Self::Metadata> {
148        Ok(EmptyMetadata)
149    }
150
151    fn build(
152        dtype: &DType,
153        _len: usize,
154        _metadata: &Self::Metadata,
155        _buffers: &[BufferHandle],
156        children: &dyn ArrayChildren,
157    ) -> VortexResult<ChunkedArray> {
158        if children.is_empty() {
159            vortex_bail!("Chunked array needs at least one child");
160        }
161
162        let nchunks = children.len() - 1;
163
164        // The first child contains the row offsets of the chunks
165        let chunk_offsets_array = children
166            .get(
167                0,
168                &DType::Primitive(PType::U64, Nullability::NonNullable),
169                // 1 extra offset for the end of the last chunk
170                nchunks + 1,
171            )?
172            .to_primitive();
173
174        let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
175
176        // The remaining children contain the actual data of the chunks
177        let chunks = chunk_offsets_buf
178            .iter()
179            .tuple_windows()
180            .enumerate()
181            .map(|(idx, (start, end))| {
182                let chunk_len = usize::try_from(end - start)
183                    .map_err(|_| vortex_err!("chunk_len {} exceeds usize range", end - start))?;
184                children.get(idx + 1, dtype, chunk_len)
185            })
186            .try_collect()?;
187
188        let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
189
190        let total_len = chunk_offsets_buf
191            .last()
192            .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
193        let len = usize::try_from(*total_len)
194            .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
195
196        // Construct directly using the struct fields to avoid recomputing chunk_offsets
197        Ok(ChunkedArray {
198            dtype: dtype.clone(),
199            len,
200            chunk_offsets,
201            chunks,
202            stats_set: Default::default(),
203        })
204    }
205
206    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
207        // Children: chunk_offsets, then chunks...
208        vortex_ensure!(
209            !children.is_empty(),
210            "Chunked array needs at least one child"
211        );
212
213        let nchunks = children.len() - 1;
214        let chunk_offsets_array = children[0].to_primitive();
215        let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
216
217        vortex_ensure!(
218            chunk_offsets_buf.len() == nchunks + 1,
219            "Expected {} chunk offsets, found {}",
220            nchunks + 1,
221            chunk_offsets_buf.len()
222        );
223
224        let chunks = children.into_iter().skip(1).collect();
225        array.chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
226        array.chunks = chunks;
227
228        let total_len = chunk_offsets_buf
229            .last()
230            .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
231        array.len = usize::try_from(*total_len)
232            .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
233
234        Ok(())
235    }
236
237    fn append_to_builder(
238        array: &ChunkedArray,
239        builder: &mut dyn ArrayBuilder,
240        ctx: &mut ExecutionCtx,
241    ) -> VortexResult<()> {
242        for chunk in array.chunks() {
243            chunk.append_to_builder(builder, ctx)?;
244        }
245        Ok(())
246    }
247
248    fn execute(array: Arc<Self::Array>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
249        Ok(ExecutionResult::done(
250            _canonicalize(&array, ctx)?.into_array(),
251        ))
252    }
253
254    fn reduce(array: &Self::Array) -> VortexResult<Option<ArrayRef>> {
255        Ok(match array.chunks.len() {
256            0 => Some(Canonical::empty(array.dtype()).into_array()),
257            1 => Some(array.chunks[0].clone()),
258            _ => None,
259        })
260    }
261
262    fn reduce_parent(
263        array: &Self::Array,
264        parent: &ArrayRef,
265        child_idx: usize,
266    ) -> VortexResult<Option<ArrayRef>> {
267        PARENT_RULES.evaluate(array, parent, child_idx)
268    }
269
270    fn execute_parent(
271        array: &Self::Array,
272        parent: &ArrayRef,
273        child_idx: usize,
274        ctx: &mut ExecutionCtx,
275    ) -> VortexResult<Option<ArrayRef>> {
276        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
277    }
278}