vortex_array/arrays/chunked/vtable/
mod.rs1use std::hash::Hash;
5
6use itertools::Itertools;
7use vortex_error::VortexResult;
8use vortex_error::vortex_bail;
9use vortex_error::vortex_ensure;
10use vortex_error::vortex_err;
11use vortex_error::vortex_panic;
12use vortex_session::VortexSession;
13
14use crate::ArrayRef;
15use crate::Canonical;
16use crate::EmptyMetadata;
17use crate::ExecutionCtx;
18use crate::ExecutionStep;
19use crate::IntoArray;
20use crate::Precision;
21use crate::ToCanonical;
22use crate::arrays::ChunkedArray;
23use crate::arrays::PrimitiveArray;
24use crate::arrays::chunked::compute::kernel::PARENT_KERNELS;
25use crate::arrays::chunked::compute::rules::PARENT_RULES;
26use crate::arrays::chunked::vtable::canonical::_canonicalize;
27use crate::buffer::BufferHandle;
28use crate::builders::ArrayBuilder;
29use crate::dtype::DType;
30use crate::dtype::Nullability;
31use crate::dtype::PType;
32use crate::hash::ArrayEq;
33use crate::hash::ArrayHash;
34use crate::serde::ArrayChildren;
35use crate::stats::StatsSetRef;
36use crate::validity::Validity;
37use crate::vtable;
38use crate::vtable::ArrayId;
39use crate::vtable::VTable;
40mod canonical;
41mod operations;
42mod validity;
43vtable!(Chunked);
44
45#[derive(Debug)]
46pub struct ChunkedVTable;
47
48impl ChunkedVTable {
49 pub const ID: ArrayId = ArrayId::new_ref("vortex.chunked");
50}
51
52impl VTable for ChunkedVTable {
53 type Array = ChunkedArray;
54
55 type Metadata = EmptyMetadata;
56 type OperationsVTable = Self;
57 type ValidityVTable = Self;
58 fn id(_array: &Self::Array) -> ArrayId {
59 Self::ID
60 }
61
62 fn len(array: &ChunkedArray) -> usize {
63 array.len
64 }
65
66 fn dtype(array: &ChunkedArray) -> &DType {
67 &array.dtype
68 }
69
70 fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
71 array.stats_set.to_ref(array.as_ref())
72 }
73
74 fn array_hash<H: std::hash::Hasher>(array: &ChunkedArray, state: &mut H, precision: Precision) {
75 array.dtype.hash(state);
76 array.len.hash(state);
77 array.chunk_offsets.as_ref().array_hash(state, precision);
78 for chunk in &array.chunks {
79 chunk.array_hash(state, precision);
80 }
81 }
82
83 fn array_eq(array: &ChunkedArray, other: &ChunkedArray, precision: Precision) -> bool {
84 array.dtype == other.dtype
85 && array.len == other.len
86 && array
87 .chunk_offsets
88 .as_ref()
89 .array_eq(other.chunk_offsets.as_ref(), precision)
90 && array.chunks.len() == other.chunks.len()
91 && array
92 .chunks
93 .iter()
94 .zip(&other.chunks)
95 .all(|(a, b)| a.array_eq(b, precision))
96 }
97
98 fn nbuffers(_array: &ChunkedArray) -> usize {
99 0
100 }
101
102 fn buffer(_array: &ChunkedArray, idx: usize) -> BufferHandle {
103 vortex_panic!("ChunkedArray buffer index {idx} out of bounds")
104 }
105
106 fn buffer_name(_array: &ChunkedArray, idx: usize) -> Option<String> {
107 vortex_panic!("ChunkedArray buffer_name index {idx} out of bounds")
108 }
109
110 fn nchildren(array: &ChunkedArray) -> usize {
111 1 + array.chunks().len()
112 }
113
114 fn child(array: &ChunkedArray, idx: usize) -> ArrayRef {
115 match idx {
116 0 => array.chunk_offsets.clone().into_array(),
117 n => array.chunks()[n - 1].clone(),
118 }
119 }
120
121 fn child_name(_array: &ChunkedArray, idx: usize) -> String {
122 match idx {
123 0 => "chunk_offsets".to_string(),
124 n => format!("chunks[{}]", n - 1),
125 }
126 }
127
128 fn metadata(_array: &ChunkedArray) -> VortexResult<Self::Metadata> {
129 Ok(EmptyMetadata)
130 }
131
132 fn serialize(_metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
133 Ok(Some(vec![]))
134 }
135
136 fn deserialize(
137 _bytes: &[u8],
138 _dtype: &DType,
139 _len: usize,
140 _buffers: &[BufferHandle],
141 _session: &VortexSession,
142 ) -> VortexResult<Self::Metadata> {
143 Ok(EmptyMetadata)
144 }
145
146 fn build(
147 dtype: &DType,
148 _len: usize,
149 _metadata: &Self::Metadata,
150 _buffers: &[BufferHandle],
151 children: &dyn ArrayChildren,
152 ) -> VortexResult<ChunkedArray> {
153 if children.is_empty() {
154 vortex_bail!("Chunked array needs at least one child");
155 }
156
157 let nchunks = children.len() - 1;
158
159 let chunk_offsets_array = children
161 .get(
162 0,
163 &DType::Primitive(PType::U64, Nullability::NonNullable),
164 nchunks + 1,
166 )?
167 .to_primitive();
168
169 let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
170
171 let chunks = chunk_offsets_buf
173 .iter()
174 .tuple_windows()
175 .enumerate()
176 .map(|(idx, (start, end))| {
177 let chunk_len = usize::try_from(end - start)
178 .map_err(|_| vortex_err!("chunk_len {} exceeds usize range", end - start))?;
179 children.get(idx + 1, dtype, chunk_len)
180 })
181 .try_collect()?;
182
183 let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
184
185 let total_len = chunk_offsets_buf
186 .last()
187 .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
188 let len = usize::try_from(*total_len)
189 .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
190
191 Ok(ChunkedArray {
193 dtype: dtype.clone(),
194 len,
195 chunk_offsets,
196 chunks,
197 stats_set: Default::default(),
198 })
199 }
200
201 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
202 vortex_ensure!(
204 !children.is_empty(),
205 "Chunked array needs at least one child"
206 );
207
208 let nchunks = children.len() - 1;
209 let chunk_offsets_array = children[0].to_primitive();
210 let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
211
212 vortex_ensure!(
213 chunk_offsets_buf.len() == nchunks + 1,
214 "Expected {} chunk offsets, found {}",
215 nchunks + 1,
216 chunk_offsets_buf.len()
217 );
218
219 let chunks = children.into_iter().skip(1).collect();
220 array.chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
221 array.chunks = chunks;
222
223 let total_len = chunk_offsets_buf
224 .last()
225 .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
226 array.len = usize::try_from(*total_len)
227 .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
228
229 Ok(())
230 }
231
232 fn append_to_builder(
233 array: &ChunkedArray,
234 builder: &mut dyn ArrayBuilder,
235 ctx: &mut ExecutionCtx,
236 ) -> VortexResult<()> {
237 for chunk in array.chunks() {
238 chunk.append_to_builder(builder, ctx)?;
239 }
240 Ok(())
241 }
242
243 fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
244 Ok(ExecutionStep::Done(_canonicalize(array, ctx)?.into_array()))
245 }
246
247 fn reduce(array: &Self::Array) -> VortexResult<Option<ArrayRef>> {
248 Ok(match array.chunks.len() {
249 0 => Some(Canonical::empty(array.dtype()).into_array()),
250 1 => Some(array.chunks[0].clone()),
251 _ => None,
252 })
253 }
254
255 fn reduce_parent(
256 array: &Self::Array,
257 parent: &ArrayRef,
258 child_idx: usize,
259 ) -> VortexResult<Option<ArrayRef>> {
260 PARENT_RULES.evaluate(array, parent, child_idx)
261 }
262
263 fn execute_parent(
264 array: &Self::Array,
265 parent: &ArrayRef,
266 child_idx: usize,
267 ctx: &mut ExecutionCtx,
268 ) -> VortexResult<Option<ArrayRef>> {
269 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
270 }
271}