vortex_array/arrays/chunked/vtable/
mod.rs1use std::hash::Hasher;
5
6use itertools::Itertools;
7use vortex_error::VortexExpect;
8use vortex_error::VortexResult;
9use vortex_error::vortex_bail;
10use vortex_error::vortex_ensure;
11use vortex_error::vortex_err;
12use vortex_error::vortex_panic;
13use vortex_session::VortexSession;
14use vortex_session::registry::CachedId;
15
16use crate::ArrayEq;
17use crate::ArrayHash;
18use crate::ArrayRef;
19use crate::Canonical;
20use crate::ExecutionCtx;
21use crate::ExecutionResult;
22use crate::IntoArray;
23use crate::Precision;
24use crate::ToCanonical;
25use crate::array::Array;
26use crate::array::ArrayId;
27use crate::array::ArrayParts;
28use crate::array::ArrayView;
29use crate::array::VTable;
30use crate::arrays::chunked::ChunkedArrayExt;
31use crate::arrays::chunked::ChunkedData;
32use crate::arrays::chunked::array::CHUNK_OFFSETS_SLOT;
33use crate::arrays::chunked::array::CHUNKS_OFFSET;
34use crate::arrays::chunked::compute::kernel::PARENT_KERNELS;
35use crate::arrays::chunked::compute::rules::PARENT_RULES;
36use crate::arrays::chunked::vtable::canonical::_canonicalize;
37use crate::buffer::BufferHandle;
38use crate::builders::ArrayBuilder;
39use crate::dtype::DType;
40use crate::dtype::Nullability;
41use crate::dtype::PType;
42use crate::serde::ArrayChildren;
43mod canonical;
44mod operations;
45mod validity;
46pub type ChunkedArray = Array<Chunked>;
48
49#[derive(Clone, Debug)]
50pub struct Chunked;
51
52impl ArrayHash for ChunkedData {
53 fn array_hash<H: Hasher>(&self, _state: &mut H, _precision: Precision) {
54 }
57}
58
59impl ArrayEq for ChunkedData {
60 fn array_eq(&self, _other: &Self, _precision: Precision) -> bool {
61 true
64 }
65}
66
67impl VTable for Chunked {
68 type ArrayData = ChunkedData;
69
70 type OperationsVTable = Self;
71 type ValidityVTable = Self;
72 fn id(&self) -> ArrayId {
73 static ID: CachedId = CachedId::new("vortex.chunked");
74 *ID
75 }
76
77 fn validate(
78 &self,
79 data: &ChunkedData,
80 dtype: &DType,
81 len: usize,
82 slots: &[Option<ArrayRef>],
83 ) -> VortexResult<()> {
84 vortex_ensure!(
85 !slots.is_empty(),
86 "ChunkedArray must have at least a chunk offsets slot"
87 );
88 let chunk_offsets = slots[CHUNK_OFFSETS_SLOT]
89 .as_ref()
90 .vortex_expect("validated chunk offsets slot");
91 vortex_ensure!(
92 chunk_offsets.dtype() == &DType::Primitive(PType::U64, Nullability::NonNullable),
93 "ChunkedArray chunk offsets must be non-nullable u64, found {}",
94 chunk_offsets.dtype()
95 );
96 vortex_ensure!(
97 chunk_offsets.len() == data.chunk_offsets.len(),
98 "ChunkedArray chunk offsets slot length {} does not match cached offsets length {}",
99 chunk_offsets.len(),
100 data.chunk_offsets.len()
101 );
102 vortex_ensure!(
103 data.chunk_offsets.len() == slots.len() - CHUNKS_OFFSET + 1,
104 "ChunkedArray chunk offsets length {} does not match {} chunks",
105 data.chunk_offsets.len(),
106 slots.len() - CHUNKS_OFFSET
107 );
108 vortex_ensure!(
109 data.chunk_offsets
110 .last()
111 .copied()
112 .vortex_expect("chunked arrays always have a leading 0 offset")
113 == len,
114 "ChunkedArray length {} does not match outer length {}",
115 data.chunk_offsets.last().copied().unwrap_or_default(),
116 len
117 );
118 for (idx, (start, end)) in data
119 .chunk_offsets
120 .iter()
121 .copied()
122 .tuple_windows()
123 .enumerate()
124 {
125 let chunk = slots[CHUNKS_OFFSET + idx]
126 .as_ref()
127 .vortex_expect("validated chunk slot");
128 vortex_ensure!(
129 chunk.dtype() == dtype,
130 "ChunkedArray chunk dtype {} does not match outer dtype {}",
131 chunk.dtype(),
132 dtype
133 );
134 vortex_ensure!(
135 chunk.len() == end - start,
136 "ChunkedArray chunk {} len {} does not match offsets span {}",
137 idx,
138 chunk.len(),
139 end - start
140 );
141 }
142 Ok(())
143 }
144
145 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
146 0
147 }
148
149 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
150 vortex_panic!("ChunkedArray buffer index {idx} out of bounds")
151 }
152
153 fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
154 vortex_panic!("ChunkedArray buffer_name index {idx} out of bounds")
155 }
156
157 fn serialize(
158 _array: ArrayView<'_, Self>,
159 _session: &VortexSession,
160 ) -> VortexResult<Option<Vec<u8>>> {
161 Ok(Some(vec![]))
162 }
163
164 fn deserialize(
165 &self,
166 dtype: &DType,
167 len: usize,
168 metadata: &[u8],
169 _buffers: &[BufferHandle],
170 children: &dyn ArrayChildren,
171 _session: &VortexSession,
172 ) -> VortexResult<ArrayParts<Self>> {
173 if !metadata.is_empty() {
174 vortex_bail!(
175 "ChunkedArray expects empty metadata, got {} bytes",
176 metadata.len()
177 );
178 }
179 if children.is_empty() {
180 vortex_bail!("Chunked array needs at least one child");
181 }
182
183 let nchunks = children.len() - 1;
184 let chunk_offsets = children.get(
185 CHUNK_OFFSETS_SLOT,
186 &DType::Primitive(PType::U64, Nullability::NonNullable),
187 nchunks + 1,
188 )?;
189 let chunk_offsets_buf = chunk_offsets.to_primitive().to_buffer::<u64>();
190 let chunk_offsets_usize = chunk_offsets_buf
191 .iter()
192 .copied()
193 .map(|offset| {
194 usize::try_from(offset)
195 .map_err(|_| vortex_err!("chunk offset {offset} exceeds usize range"))
196 })
197 .collect::<VortexResult<Vec<_>>>()?;
198 let mut slots = Vec::with_capacity(children.len());
199 slots.push(Some(chunk_offsets));
200 for (idx, (start, end)) in chunk_offsets_usize
201 .iter()
202 .copied()
203 .tuple_windows()
204 .enumerate()
205 {
206 let chunk_len = end - start;
207 slots.push(Some(children.get(idx + CHUNKS_OFFSET, dtype, chunk_len)?));
208 }
209
210 Ok(ArrayParts::new(
211 self.clone(),
212 dtype.clone(),
213 len,
214 ChunkedData {
215 chunk_offsets: chunk_offsets_usize,
216 },
217 )
218 .with_slots(slots))
219 }
220
221 fn append_to_builder(
222 array: ArrayView<'_, Self>,
223 builder: &mut dyn ArrayBuilder,
224 ctx: &mut ExecutionCtx,
225 ) -> VortexResult<()> {
226 for chunk in array.iter_chunks() {
227 chunk.append_to_builder(builder, ctx)?;
228 }
229 Ok(())
230 }
231
232 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
233 match idx {
234 CHUNK_OFFSETS_SLOT => "chunk_offsets".to_string(),
235 n => format!("chunks[{}]", n - CHUNKS_OFFSET),
236 }
237 }
238
239 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
240 Ok(ExecutionResult::done(_canonicalize(array.as_view(), ctx)?))
241 }
242
243 fn execute_parent(
244 array: ArrayView<'_, Self>,
245 parent: &ArrayRef,
246 child_idx: usize,
247 ctx: &mut ExecutionCtx,
248 ) -> VortexResult<Option<ArrayRef>> {
249 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
250 }
251
252 fn reduce(array: ArrayView<'_, Self>) -> VortexResult<Option<ArrayRef>> {
253 Ok(match array.nchunks() {
254 0 => Some(Canonical::empty(array.dtype()).into_array()),
255 1 => Some(array.chunk(0).clone()),
256 _ => None,
257 })
258 }
259
260 fn reduce_parent(
261 array: ArrayView<'_, Self>,
262 parent: &ArrayRef,
263 child_idx: usize,
264 ) -> VortexResult<Option<ArrayRef>> {
265 PARENT_RULES.evaluate(array, parent, child_idx)
266 }
267}