vortex_array/arrays/chunked/vtable/
mod.rs1use std::hash::Hash;
5
6use itertools::Itertools;
7use vortex_error::VortexResult;
8use vortex_error::vortex_bail;
9use vortex_error::vortex_ensure;
10use vortex_error::vortex_err;
11use vortex_error::vortex_panic;
12use vortex_session::VortexSession;
13
14use crate::ArrayRef;
15use crate::Canonical;
16use crate::EmptyMetadata;
17use crate::ExecutionCtx;
18use crate::IntoArray;
19use crate::Precision;
20use crate::ToCanonical;
21use crate::arrays::ChunkedArray;
22use crate::arrays::PrimitiveArray;
23use crate::arrays::chunked::compute::kernel::PARENT_KERNELS;
24use crate::arrays::chunked::compute::rules::PARENT_RULES;
25use crate::arrays::chunked::vtable::canonical::_canonicalize;
26use crate::buffer::BufferHandle;
27use crate::builders::ArrayBuilder;
28use crate::dtype::DType;
29use crate::dtype::Nullability;
30use crate::dtype::PType;
31use crate::hash::ArrayEq;
32use crate::hash::ArrayHash;
33use crate::serde::ArrayChildren;
34use crate::stats::StatsSetRef;
35use crate::validity::Validity;
36use crate::vtable;
37use crate::vtable::ArrayId;
38use crate::vtable::VTable;
39mod canonical;
40mod operations;
41mod validity;
42vtable!(Chunked);
43
44#[derive(Debug)]
45pub struct ChunkedVTable;
46
47impl ChunkedVTable {
48 pub const ID: ArrayId = ArrayId::new_ref("vortex.chunked");
49}
50
51impl VTable for ChunkedVTable {
52 type Array = ChunkedArray;
53
54 type Metadata = EmptyMetadata;
55 type OperationsVTable = Self;
56 type ValidityVTable = Self;
57 fn id(_array: &Self::Array) -> ArrayId {
58 Self::ID
59 }
60
61 fn len(array: &ChunkedArray) -> usize {
62 array.len
63 }
64
65 fn dtype(array: &ChunkedArray) -> &DType {
66 &array.dtype
67 }
68
69 fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
70 array.stats_set.to_ref(array.as_ref())
71 }
72
73 fn array_hash<H: std::hash::Hasher>(array: &ChunkedArray, state: &mut H, precision: Precision) {
74 array.dtype.hash(state);
75 array.len.hash(state);
76 array.chunk_offsets.as_ref().array_hash(state, precision);
77 for chunk in &array.chunks {
78 chunk.array_hash(state, precision);
79 }
80 }
81
82 fn array_eq(array: &ChunkedArray, other: &ChunkedArray, precision: Precision) -> bool {
83 array.dtype == other.dtype
84 && array.len == other.len
85 && array
86 .chunk_offsets
87 .as_ref()
88 .array_eq(other.chunk_offsets.as_ref(), precision)
89 && array.chunks.len() == other.chunks.len()
90 && array
91 .chunks
92 .iter()
93 .zip(&other.chunks)
94 .all(|(a, b)| a.array_eq(b, precision))
95 }
96
97 fn nbuffers(_array: &ChunkedArray) -> usize {
98 0
99 }
100
101 fn buffer(_array: &ChunkedArray, idx: usize) -> BufferHandle {
102 vortex_panic!("ChunkedArray buffer index {idx} out of bounds")
103 }
104
105 fn buffer_name(_array: &ChunkedArray, idx: usize) -> Option<String> {
106 vortex_panic!("ChunkedArray buffer_name index {idx} out of bounds")
107 }
108
109 fn nchildren(array: &ChunkedArray) -> usize {
110 1 + array.chunks().len()
111 }
112
113 fn child(array: &ChunkedArray, idx: usize) -> ArrayRef {
114 match idx {
115 0 => array.chunk_offsets.to_array(),
116 n => array.chunks()[n - 1].clone(),
117 }
118 }
119
120 fn child_name(_array: &ChunkedArray, idx: usize) -> String {
121 match idx {
122 0 => "chunk_offsets".to_string(),
123 n => format!("chunks[{}]", n - 1),
124 }
125 }
126
127 fn metadata(_array: &ChunkedArray) -> VortexResult<Self::Metadata> {
128 Ok(EmptyMetadata)
129 }
130
131 fn serialize(_metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
132 Ok(Some(vec![]))
133 }
134
135 fn deserialize(
136 _bytes: &[u8],
137 _dtype: &DType,
138 _len: usize,
139 _buffers: &[BufferHandle],
140 _session: &VortexSession,
141 ) -> VortexResult<Self::Metadata> {
142 Ok(EmptyMetadata)
143 }
144
145 fn build(
146 dtype: &DType,
147 _len: usize,
148 _metadata: &Self::Metadata,
149 _buffers: &[BufferHandle],
150 children: &dyn ArrayChildren,
151 ) -> VortexResult<ChunkedArray> {
152 if children.is_empty() {
153 vortex_bail!("Chunked array needs at least one child");
154 }
155
156 let nchunks = children.len() - 1;
157
158 let chunk_offsets_array = children
160 .get(
161 0,
162 &DType::Primitive(PType::U64, Nullability::NonNullable),
163 nchunks + 1,
165 )?
166 .to_primitive();
167
168 let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
169
170 let chunks = chunk_offsets_buf
172 .iter()
173 .tuple_windows()
174 .enumerate()
175 .map(|(idx, (start, end))| {
176 let chunk_len = usize::try_from(end - start)
177 .map_err(|_| vortex_err!("chunk_len {} exceeds usize range", end - start))?;
178 children.get(idx + 1, dtype, chunk_len)
179 })
180 .try_collect()?;
181
182 let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
183
184 let total_len = chunk_offsets_buf
185 .last()
186 .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
187 let len = usize::try_from(*total_len)
188 .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
189
190 Ok(ChunkedArray {
192 dtype: dtype.clone(),
193 len,
194 chunk_offsets,
195 chunks,
196 stats_set: Default::default(),
197 })
198 }
199
200 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
201 vortex_ensure!(
203 !children.is_empty(),
204 "Chunked array needs at least one child"
205 );
206
207 let nchunks = children.len() - 1;
208 let chunk_offsets_array = children[0].to_primitive();
209 let chunk_offsets_buf = chunk_offsets_array.to_buffer::<u64>();
210
211 vortex_ensure!(
212 chunk_offsets_buf.len() == nchunks + 1,
213 "Expected {} chunk offsets, found {}",
214 nchunks + 1,
215 chunk_offsets_buf.len()
216 );
217
218 let chunks = children.into_iter().skip(1).collect();
219 array.chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
220 array.chunks = chunks;
221
222 let total_len = chunk_offsets_buf
223 .last()
224 .ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
225 array.len = usize::try_from(*total_len)
226 .map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
227
228 Ok(())
229 }
230
231 fn append_to_builder(
232 array: &ChunkedArray,
233 builder: &mut dyn ArrayBuilder,
234 ctx: &mut ExecutionCtx,
235 ) -> VortexResult<()> {
236 for chunk in array.chunks() {
237 chunk.append_to_builder(builder, ctx)?;
238 }
239 Ok(())
240 }
241
242 fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
243 Ok(_canonicalize(array, ctx)?.into_array())
244 }
245
246 fn reduce(array: &Self::Array) -> VortexResult<Option<ArrayRef>> {
247 Ok(match array.chunks.len() {
248 0 => Some(Canonical::empty(array.dtype()).into_array()),
249 1 => Some(array.chunks[0].clone()),
250 _ => None,
251 })
252 }
253
254 fn reduce_parent(
255 array: &Self::Array,
256 parent: &ArrayRef,
257 child_idx: usize,
258 ) -> VortexResult<Option<ArrayRef>> {
259 PARENT_RULES.evaluate(array, parent, child_idx)
260 }
261
262 fn execute_parent(
263 array: &Self::Array,
264 parent: &ArrayRef,
265 child_idx: usize,
266 ctx: &mut ExecutionCtx,
267 ) -> VortexResult<Option<ArrayRef>> {
268 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
269 }
270}