Skip to main content

vortex_array/arrays/chunked/vtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use itertools::Itertools;
5use vortex_dtype::DType;
6use vortex_dtype::Nullability;
7use vortex_dtype::PType;
8use vortex_error::VortexResult;
9use vortex_error::vortex_bail;
10use vortex_error::vortex_ensure;
11use vortex_error::vortex_err;
12use vortex_session::VortexSession;
13
14use crate::ArrayRef;
15use crate::Canonical;
16use crate::EmptyMetadata;
17use crate::ExecutionCtx;
18use crate::IntoArray;
19use crate::ToCanonical;
20use crate::arrays::ChunkedArray;
21use crate::arrays::PrimitiveArray;
22use crate::arrays::chunked::compute::kernel::PARENT_KERNELS;
23use crate::arrays::chunked::compute::rules::PARENT_RULES;
24use crate::arrays::chunked::vtable::canonical::_canonicalize;
25use crate::buffer::BufferHandle;
26use crate::builders::ArrayBuilder;
27use crate::serde::ArrayChildren;
28use crate::validity::Validity;
29use crate::vtable;
30use crate::vtable::ArrayId;
31use crate::vtable::VTable;
32
33mod array;
34mod canonical;
35mod operations;
36mod validity;
37mod visitor;
38
39vtable!(Chunked);
40
41#[derive(Debug)]
42pub struct ChunkedVTable;
43
44impl ChunkedVTable {
45    pub const ID: ArrayId = ArrayId::new_ref("vortex.chunked");
46}
47
48impl VTable for ChunkedVTable {
49    type Array = ChunkedArray;
50
51    type Metadata = EmptyMetadata;
52
53    type ArrayVTable = Self;
54    type OperationsVTable = Self;
55    type ValidityVTable = Self;
56    type VisitorVTable = Self;
57
58    fn id(_array: &Self::Array) -> ArrayId {
59        Self::ID
60    }
61
62    fn metadata(_array: &ChunkedArray) -> VortexResult<Self::Metadata> {
63        Ok(EmptyMetadata)
64    }
65
66    fn serialize(_metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
67        Ok(Some(vec![]))
68    }
69
70    fn deserialize(
71        _bytes: &[u8],
72        _dtype: &DType,
73        _len: usize,
74        _buffers: &[BufferHandle],
75        _session: &VortexSession,
76    ) -> VortexResult<Self::Metadata> {
77        Ok(EmptyMetadata)
78    }
79
80    fn build(
81        dtype: &DType,
82        _len: usize,
83        _metadata: &Self::Metadata,
84        _buffers: &[BufferHandle],
85        children: &dyn ArrayChildren,
86    ) -> VortexResult<ChunkedArray> {
87        if children.is_empty() {
88            vortex_bail!("Chunked array needs at least one child");
89        }
90
91        let nchunks = children.len() - 1;
92
93        // The first child contains the row offsets of the chunks
94        let chunk_offsets_array = children
95            .get(
96                0,
97                &DType::Primitive(PType::U64, Nullability::NonNullable),
98                // 1 extra offset for the end of the last chunk
99                nchunks + 1,
100            )?
101            .to_primitive();
102
103        let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
104
105        // The remaining children contain the actual data of the chunks
106        let chunks = chunk_offsets_buf
107            .iter()
108            .tuple_windows()
109            .enumerate()
110            .map(|(idx, (start, end))| {
111                let chunk_len = usize::try_from(end - start)
112                    .map_err(|_| vortex_err!("chunk_len {} exceeds usize range", end - start))?;
113                children.get(idx + 1, dtype, chunk_len)
114            })
115            .try_collect()?;
116
117        let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
118
119        let total_len = chunk_offsets_buf
120            .last()
121            .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
122        let len = usize::try_from(*total_len)
123            .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
124
125        // Construct directly using the struct fields to avoid recomputing chunk_offsets
126        Ok(ChunkedArray {
127            dtype: dtype.clone(),
128            len,
129            chunk_offsets,
130            chunks,
131            stats_set: Default::default(),
132        })
133    }
134
135    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
136        // Children: chunk_offsets, then chunks...
137        vortex_ensure!(
138            !children.is_empty(),
139            "Chunked array needs at least one child"
140        );
141
142        let nchunks = children.len() - 1;
143        let chunk_offsets_array = children[0].to_primitive();
144        let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
145
146        vortex_ensure!(
147            chunk_offsets_buf.len() == nchunks + 1,
148            "Expected {} chunk offsets, found {}",
149            nchunks + 1,
150            chunk_offsets_buf.len()
151        );
152
153        let chunks = children.into_iter().skip(1).collect();
154        array.chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
155        array.chunks = chunks;
156
157        let total_len = chunk_offsets_buf
158            .last()
159            .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
160        array.len = usize::try_from(*total_len)
161            .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
162
163        Ok(())
164    }
165
166    fn append_to_builder(
167        array: &ChunkedArray,
168        builder: &mut dyn ArrayBuilder,
169        ctx: &mut ExecutionCtx,
170    ) -> VortexResult<()> {
171        for chunk in array.chunks() {
172            chunk.append_to_builder(builder, ctx)?;
173        }
174        Ok(())
175    }
176
177    fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
178        Ok(_canonicalize(array, ctx)?.into_array())
179    }
180
181    fn reduce(array: &Self::Array) -> VortexResult<Option<ArrayRef>> {
182        Ok(match array.chunks.len() {
183            0 => Some(Canonical::empty(array.dtype()).into_array()),
184            1 => Some(array.chunks[0].clone()),
185            _ => None,
186        })
187    }
188
189    fn reduce_parent(
190        array: &Self::Array,
191        parent: &ArrayRef,
192        child_idx: usize,
193    ) -> VortexResult<Option<ArrayRef>> {
194        PARENT_RULES.evaluate(array, parent, child_idx)
195    }
196
197    fn execute_parent(
198        array: &Self::Array,
199        parent: &ArrayRef,
200        child_idx: usize,
201        ctx: &mut ExecutionCtx,
202    ) -> VortexResult<Option<ArrayRef>> {
203        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
204    }
205}