vortex_array/arrays/chunked/
mod.rs1use std::fmt::Debug;
6
7use futures_util::stream;
8use itertools::Itertools;
9use vortex_buffer::{Buffer, BufferMut};
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
12use vortex_mask::Mask;
13
14use crate::array::ArrayValidityImpl;
15use crate::compute::{ComputeFn, InvocationArgs, Output, SearchSorted, SearchSortedSide};
16use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
17use crate::nbytes::NBytes;
18use crate::stats::{ArrayStats, StatsSetRef};
19use crate::stream::{ArrayStream, ArrayStreamAdapter};
20use crate::vtable::VTableRef;
21use crate::{Array, ArrayImpl, ArrayRef, ArrayStatisticsImpl, EmptyMetadata, Encoding, IntoArray};
22
23mod canonical;
24mod compute;
25mod serde;
26mod variants;
27
28#[derive(Clone, Debug)]
29pub struct ChunkedArray {
30 dtype: DType,
31 len: usize,
32 chunk_offsets: Buffer<u64>,
33 chunks: Vec<ArrayRef>,
34 stats_set: ArrayStats,
35}
36
37#[derive(Debug)]
38pub struct ChunkedEncoding;
39impl Encoding for ChunkedEncoding {
40 type Array = ChunkedArray;
41 type Metadata = EmptyMetadata;
42}
43
44impl ChunkedArray {
45 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
46 for chunk in &chunks {
47 if chunk.dtype() != &dtype {
48 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
49 }
50 }
51
52 Ok(Self::new_unchecked(chunks, dtype))
53 }
54
55 pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
56 let nchunks = chunks.len();
57
58 let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
59 unsafe { chunk_offsets.push_unchecked(0) }
60 let mut curr_offset = 0;
61 for c in &chunks {
62 curr_offset += c.len() as u64;
63 unsafe { chunk_offsets.push_unchecked(curr_offset) }
64 }
65 assert_eq!(chunk_offsets.len(), nchunks + 1);
66
67 Self {
68 dtype,
69 len: curr_offset.try_into().vortex_unwrap(),
70 chunk_offsets: chunk_offsets.freeze(),
71 chunks,
72 stats_set: Default::default(),
73 }
74 }
75
76 #[inline]
78 pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
79 if idx >= self.nchunks() {
80 vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
81 }
82 Ok(&self.chunks[idx])
83 }
84
85 pub fn nchunks(&self) -> usize {
86 self.chunks.len()
87 }
88
89 #[inline]
90 pub fn chunk_offsets(&self) -> &[u64] {
91 &self.chunk_offsets
92 }
93
94 fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
95 assert!(index <= self.len(), "Index out of bounds of the array");
96 let index = index as u64;
97
98 let index_chunk = self
101 .chunk_offsets()
102 .search_sorted(&index, SearchSortedSide::Right)
103 .to_ends_index(self.nchunks() + 1)
104 .saturating_sub(1);
105 let chunk_start = self.chunk_offsets()[index_chunk];
106
107 let index_in_chunk =
108 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
109 (index_chunk, index_in_chunk)
110 }
111
112 pub fn chunks(&self) -> &[ArrayRef] {
113 &self.chunks
114 }
115
116 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
117 self.chunks().iter().filter(|c| !c.is_empty())
118 }
119
120 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
121 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
122 }
123
124 pub fn array_stream(&self) -> impl ArrayStream + '_ {
125 ArrayStreamAdapter::new(
126 self.dtype().clone(),
127 stream::iter(self.chunks().iter().cloned().map(Ok)),
128 )
129 }
130
131 pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult<Self> {
132 let mut new_chunks = Vec::new();
133 let mut chunks_to_combine = Vec::new();
134 let mut new_chunk_n_bytes = 0;
135 let mut new_chunk_n_elements = 0;
136 for chunk in self.chunks() {
137 let n_bytes = chunk.nbytes();
138 let n_elements = chunk.len();
139
140 if (new_chunk_n_bytes + n_bytes > target_bytesize
141 || new_chunk_n_elements + n_elements > target_rowsize)
142 && !chunks_to_combine.is_empty()
143 {
144 new_chunks.push(
145 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
146 .to_canonical()?
147 .into_array(),
148 );
149
150 new_chunk_n_bytes = 0;
151 new_chunk_n_elements = 0;
152 chunks_to_combine = Vec::new();
153 }
154
155 if n_bytes > target_bytesize || n_elements > target_rowsize {
156 new_chunks.push(chunk.clone());
157 } else {
158 new_chunk_n_bytes += n_bytes;
159 new_chunk_n_elements += n_elements;
160 chunks_to_combine.push(chunk.clone());
161 }
162 }
163
164 if !chunks_to_combine.is_empty() {
165 new_chunks.push(
166 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
167 .to_canonical()?
168 .into_array(),
169 );
170 }
171
172 Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
173 }
174}
175
176impl FromIterator<ArrayRef> for ChunkedArray {
177 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
178 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
179 let dtype = chunks
180 .first()
181 .map(|c| c.dtype().clone())
182 .vortex_expect("Cannot infer DType from an empty iterator");
183 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
184 }
185}
186
187impl ArrayImpl for ChunkedArray {
188 type Encoding = ChunkedEncoding;
189
190 fn _len(&self) -> usize {
191 self.len
192 }
193
194 fn _dtype(&self) -> &DType {
195 &self.dtype
196 }
197
198 fn _vtable(&self) -> VTableRef {
199 VTableRef::new_ref(&ChunkedEncoding)
200 }
201
202 fn _with_children(&self, children: &[ArrayRef]) -> VortexResult<Self> {
203 Ok(ChunkedArray::new_unchecked(
204 children[1..].to_vec(),
206 self.dtype.clone(),
207 ))
208 }
209
210 fn _invoke(
211 &self,
212 compute_fn: &ComputeFn,
213 args: &InvocationArgs,
214 ) -> VortexResult<Option<Output>> {
215 if compute_fn.is_elementwise() {
216 return self.invoke_elementwise(compute_fn, args);
217 }
218 Ok(None)
219 }
220}
221
222impl ArrayStatisticsImpl for ChunkedArray {
223 fn _stats_ref(&self) -> StatsSetRef<'_> {
224 self.stats_set.to_ref(self)
225 }
226}
227
228impl ArrayValidityImpl for ChunkedArray {
229 fn _is_valid(&self, index: usize) -> VortexResult<bool> {
230 if !self.dtype.is_nullable() {
231 return Ok(true);
232 }
233 let (chunk, offset_in_chunk) = self.find_chunk_idx(index);
234 self.chunk(chunk)?.is_valid(offset_in_chunk)
235 }
236
237 fn _all_valid(&self) -> VortexResult<bool> {
238 if !self.dtype().is_nullable() {
239 return Ok(true);
240 }
241 for chunk in self.chunks() {
242 if !chunk.all_valid()? {
243 return Ok(false);
244 }
245 }
246 Ok(true)
247 }
248
249 fn _all_invalid(&self) -> VortexResult<bool> {
250 if !self.dtype().is_nullable() {
251 return Ok(false);
252 }
253 for chunk in self.chunks() {
254 if !chunk.all_invalid()? {
255 return Ok(false);
256 }
257 }
258 Ok(true)
259 }
260
261 fn _validity_mask(&self) -> VortexResult<Mask> {
262 self.chunks()
263 .iter()
264 .map(|a| a.validity_mask())
265 .try_collect()
266 }
267}
268
269#[cfg(test)]
270mod test {
271 use vortex_buffer::buffer;
272 use vortex_dtype::{DType, Nullability, PType};
273 use vortex_error::VortexResult;
274
275 use crate::array::Array;
276 use crate::arrays::chunked::ChunkedArray;
277 use crate::compute::conformance::binary_numeric::test_numeric;
278 use crate::compute::{cast, scalar_at, sub_scalar};
279 use crate::{ArrayExt, IntoArray, ToCanonical, assert_arrays_eq};
280
281 fn chunked_array() -> ChunkedArray {
282 ChunkedArray::try_new(
283 vec![
284 buffer![1u64, 2, 3].into_array(),
285 buffer![4u64, 5, 6].into_array(),
286 buffer![7u64, 8, 9].into_array(),
287 ],
288 DType::Primitive(PType::U64, Nullability::NonNullable),
289 )
290 .unwrap()
291 }
292
293 #[test]
294 fn test_scalar_subtract() {
295 let chunked = chunked_array().into_array();
296 let to_subtract = 1u64;
297 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
298
299 let chunked = array.as_::<ChunkedArray>();
300 let chunks_out = chunked.chunks();
301
302 let results = chunks_out[0]
303 .to_primitive()
304 .unwrap()
305 .as_slice::<u64>()
306 .to_vec();
307 assert_eq!(results, &[0u64, 1, 2]);
308 let results = chunks_out[1]
309 .to_primitive()
310 .unwrap()
311 .as_slice::<u64>()
312 .to_vec();
313 assert_eq!(results, &[3u64, 4, 5]);
314 let results = chunks_out[2]
315 .to_primitive()
316 .unwrap()
317 .as_slice::<u64>()
318 .to_vec();
319 assert_eq!(results, &[6u64, 7, 8]);
320 }
321
322 #[test]
323 fn test_rechunk_one_chunk() {
324 let chunked = ChunkedArray::try_new(
325 vec![buffer![0u64].into_array()],
326 DType::Primitive(PType::U64, Nullability::NonNullable),
327 )
328 .unwrap();
329
330 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
331
332 assert_arrays_eq!(chunked, rechunked);
333 }
334
335 #[test]
336 fn test_rechunk_two_chunks() {
337 let chunked = ChunkedArray::try_new(
338 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
339 DType::Primitive(PType::U64, Nullability::NonNullable),
340 )
341 .unwrap();
342
343 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
344
345 assert_eq!(rechunked.nchunks(), 1);
346 assert_arrays_eq!(chunked, rechunked);
347 }
348
349 #[test]
350 fn test_rechunk_tiny_target_chunks() {
351 let chunked = ChunkedArray::try_new(
352 vec![
353 buffer![0u64, 1, 2, 3].into_array(),
354 buffer![4u64, 5].into_array(),
355 ],
356 DType::Primitive(PType::U64, Nullability::NonNullable),
357 )
358 .unwrap();
359
360 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
361
362 assert_eq!(rechunked.nchunks(), 2);
363 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
364 assert_arrays_eq!(chunked, rechunked);
365 }
366
367 #[test]
368 fn test_rechunk_with_too_big_chunk() {
369 let chunked = ChunkedArray::try_new(
370 vec![
371 buffer![0u64, 1, 2].into_array(),
372 buffer![42_u64; 6].into_array(),
373 buffer![4u64, 5].into_array(),
374 buffer![6u64, 7].into_array(),
375 buffer![8u64, 9].into_array(),
376 ],
377 DType::Primitive(PType::U64, Nullability::NonNullable),
378 )
379 .unwrap();
380
381 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
382 assert_eq!(rechunked.nchunks(), 4);
385 assert_arrays_eq!(chunked, rechunked);
386 }
387
388 #[test]
389 fn test_chunked_binary_numeric() {
390 let array = chunked_array();
391 let signed_dtype = DType::from(PType::try_from(array.dtype()).unwrap().to_signed());
393 let array = cast(&array, &signed_dtype).unwrap();
394 test_numeric::<u64>(array)
395 }
396}