vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9
10use futures_util::stream;
11use itertools::Itertools;
12use vortex_buffer::{Buffer, BufferMut};
13use vortex_dtype::DType;
14use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
15use vortex_mask::Mask;
16
17use crate::arrays::ChunkedVTable;
18use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
19use crate::search_sorted::{SearchSorted, SearchSortedSide};
20use crate::stats::{ArrayStats, StatsSetRef};
21use crate::stream::{ArrayStream, ArrayStreamAdapter};
22use crate::vtable::{ArrayVTable, ValidityVTable};
23use crate::{Array, ArrayRef, IntoArray};
24
25#[derive(Clone, Debug)]
26pub struct ChunkedArray {
27 dtype: DType,
28 len: usize,
29 chunk_offsets: Buffer<u64>,
30 chunks: Vec<ArrayRef>,
31 stats_set: ArrayStats,
32}
33
34impl ChunkedArray {
35 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
36 for chunk in &chunks {
37 if chunk.dtype() != &dtype {
38 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
39 }
40 }
41
42 unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
44 }
45
46 pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
54 let nchunks = chunks.len();
55
56 let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
57 unsafe { chunk_offsets.push_unchecked(0) }
59 let mut curr_offset = 0;
60 for c in &chunks {
61 curr_offset += c.len() as u64;
62 unsafe { chunk_offsets.push_unchecked(curr_offset) }
64 }
65
66 Self {
67 dtype,
68 len: curr_offset.try_into().vortex_unwrap(),
69 chunk_offsets: chunk_offsets.freeze(),
70 chunks,
71 stats_set: Default::default(),
72 }
73 }
74
75 #[inline]
76 pub fn chunk(&self, idx: usize) -> &ArrayRef {
77 assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
78
79 &self.chunks[idx]
80 }
81
82 pub fn nchunks(&self) -> usize {
83 self.chunks.len()
84 }
85
86 #[inline]
87 pub fn chunk_offsets(&self) -> &Buffer<u64> {
88 &self.chunk_offsets
89 }
90
91 pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
92 assert!(index <= self.len(), "Index out of bounds of the array");
93 let index = index as u64;
94
95 let index_chunk = self
98 .chunk_offsets()
99 .search_sorted(&index, SearchSortedSide::Right)
100 .to_ends_index(self.nchunks() + 1)
101 .saturating_sub(1);
102 let chunk_start = self.chunk_offsets()[index_chunk];
103
104 let index_in_chunk =
105 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
106 (index_chunk, index_in_chunk)
107 }
108
109 pub fn chunks(&self) -> &[ArrayRef] {
110 &self.chunks
111 }
112
113 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
114 self.chunks().iter().filter(|c| !c.is_empty())
115 }
116
117 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
118 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
119 }
120
121 pub fn array_stream(&self) -> impl ArrayStream + '_ {
122 ArrayStreamAdapter::new(
123 self.dtype().clone(),
124 stream::iter(self.chunks().iter().cloned().map(Ok)),
125 )
126 }
127
128 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
129 let mut new_chunks = Vec::new();
130 let mut chunks_to_combine = Vec::new();
131 let mut new_chunk_n_bytes = 0;
132 let mut new_chunk_n_elements = 0;
133 for chunk in self.chunks() {
134 let n_bytes = chunk.nbytes();
135 let n_elements = chunk.len();
136
137 if (new_chunk_n_bytes + n_bytes > target_bytesize
138 || new_chunk_n_elements + n_elements > target_rowsize)
139 && !chunks_to_combine.is_empty()
140 {
141 new_chunks.push(
142 unsafe {
144 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
145 .to_canonical()?
146 .into_array()
147 },
148 );
149
150 new_chunk_n_bytes = 0;
151 new_chunk_n_elements = 0;
152 chunks_to_combine = Vec::new();
153 }
154
155 if n_bytes > target_bytesize || n_elements > target_rowsize {
156 new_chunks.push(chunk.clone());
157 } else {
158 new_chunk_n_bytes += n_bytes;
159 new_chunk_n_elements += n_elements;
160 chunks_to_combine.push(chunk.clone());
161 }
162 }
163
164 if !chunks_to_combine.is_empty() {
165 new_chunks.push(unsafe {
166 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
168 .to_canonical()?
169 .into_array()
170 });
171 }
172
173 unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
175 }
176}
177
178impl FromIterator<ArrayRef> for ChunkedArray {
179 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
180 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
181 let dtype = chunks
182 .first()
183 .map(|c| c.dtype().clone())
184 .vortex_expect("Cannot infer DType from an empty iterator");
185 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
186 }
187}
188
189impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
190 fn len(array: &ChunkedArray) -> usize {
191 array.len
192 }
193
194 fn dtype(array: &ChunkedArray) -> &DType {
195 &array.dtype
196 }
197
198 fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
199 array.stats_set.to_ref(array.as_ref())
200 }
201}
202
203impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
204 fn is_valid(array: &ChunkedArray, index: usize) -> VortexResult<bool> {
205 if !array.dtype.is_nullable() {
206 return Ok(true);
207 }
208 let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
209 array.chunk(chunk).is_valid(offset_in_chunk)
210 }
211
212 fn all_valid(array: &ChunkedArray) -> VortexResult<bool> {
213 if !array.dtype().is_nullable() {
214 return Ok(true);
215 }
216 for chunk in array.non_empty_chunks() {
217 if !chunk.all_valid()? {
218 return Ok(false);
219 }
220 }
221 Ok(true)
222 }
223
224 fn all_invalid(array: &ChunkedArray) -> VortexResult<bool> {
225 if !array.dtype().is_nullable() {
226 return Ok(false);
227 }
228 for chunk in array.non_empty_chunks() {
229 if !chunk.all_invalid()? {
230 return Ok(false);
231 }
232 }
233 Ok(true)
234 }
235
236 fn validity_mask(array: &ChunkedArray) -> VortexResult<Mask> {
237 array
238 .chunks()
239 .iter()
240 .map(|a| a.validity_mask())
241 .try_collect()
242 }
243}
244
245#[cfg(test)]
246mod test {
247 use vortex_buffer::buffer;
248 use vortex_dtype::{DType, Nullability, PType};
249 use vortex_error::VortexResult;
250
251 use crate::array::Array;
252 use crate::arrays::chunked::ChunkedArray;
253 use crate::arrays::{ChunkedVTable, PrimitiveArray};
254 use crate::compute::sub_scalar;
255 use crate::validity::Validity;
256 use crate::{IntoArray, ToCanonical, assert_arrays_eq};
257
258 fn chunked_array() -> ChunkedArray {
259 ChunkedArray::try_new(
260 vec![
261 buffer![1u64, 2, 3].into_array(),
262 buffer![4u64, 5, 6].into_array(),
263 buffer![7u64, 8, 9].into_array(),
264 ],
265 DType::Primitive(PType::U64, Nullability::NonNullable),
266 )
267 .unwrap()
268 }
269
270 #[test]
271 fn test_scalar_subtract() {
272 let chunked = chunked_array().into_array();
273 let to_subtract = 1u64;
274 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
275
276 let chunked = array.as_::<ChunkedVTable>();
277 let chunks_out = chunked.chunks();
278
279 let results = chunks_out[0]
280 .to_primitive()
281 .unwrap()
282 .as_slice::<u64>()
283 .to_vec();
284 assert_eq!(results, &[0u64, 1, 2]);
285 let results = chunks_out[1]
286 .to_primitive()
287 .unwrap()
288 .as_slice::<u64>()
289 .to_vec();
290 assert_eq!(results, &[3u64, 4, 5]);
291 let results = chunks_out[2]
292 .to_primitive()
293 .unwrap()
294 .as_slice::<u64>()
295 .to_vec();
296 assert_eq!(results, &[6u64, 7, 8]);
297 }
298
299 #[test]
300 fn test_rechunk_one_chunk() {
301 let chunked = ChunkedArray::try_new(
302 vec![buffer![0u64].into_array()],
303 DType::Primitive(PType::U64, Nullability::NonNullable),
304 )
305 .unwrap();
306
307 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
308
309 assert_arrays_eq!(chunked, rechunked);
310 }
311
312 #[test]
313 fn test_rechunk_two_chunks() {
314 let chunked = ChunkedArray::try_new(
315 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
316 DType::Primitive(PType::U64, Nullability::NonNullable),
317 )
318 .unwrap();
319
320 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
321
322 assert_eq!(rechunked.nchunks(), 1);
323 assert_arrays_eq!(chunked, rechunked);
324 }
325
326 #[test]
327 fn test_rechunk_tiny_target_chunks() {
328 let chunked = ChunkedArray::try_new(
329 vec![
330 buffer![0u64, 1, 2, 3].into_array(),
331 buffer![4u64, 5].into_array(),
332 ],
333 DType::Primitive(PType::U64, Nullability::NonNullable),
334 )
335 .unwrap();
336
337 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
338
339 assert_eq!(rechunked.nchunks(), 2);
340 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
341 assert_arrays_eq!(chunked, rechunked);
342 }
343
344 #[test]
345 fn test_rechunk_with_too_big_chunk() {
346 let chunked = ChunkedArray::try_new(
347 vec![
348 buffer![0u64, 1, 2].into_array(),
349 buffer![42_u64; 6].into_array(),
350 buffer![4u64, 5].into_array(),
351 buffer![6u64, 7].into_array(),
352 buffer![8u64, 9].into_array(),
353 ],
354 DType::Primitive(PType::U64, Nullability::NonNullable),
355 )
356 .unwrap();
357
358 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
359 assert_eq!(rechunked.nchunks(), 4);
362 assert_arrays_eq!(chunked, rechunked);
363 }
364
365 #[test]
366 fn test_empty_chunks_all_valid() -> VortexResult<()> {
367 let chunks = vec![
369 PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
370 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
372 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), ];
374
375 let chunked =
376 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
377
378 assert!(chunked.all_valid()?);
380 assert!(!chunked.all_invalid()?);
381
382 Ok(())
383 }
384
385 #[test]
386 fn test_empty_chunks_all_invalid() -> VortexResult<()> {
387 let chunks = vec![
389 PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
390 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
392 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
394
395 let chunked =
396 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
397
398 assert!(!chunked.all_valid()?);
400 assert!(chunked.all_invalid()?);
401
402 Ok(())
403 }
404
405 #[test]
406 fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
407 let chunks = vec![
409 PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
410 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
412 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
414
415 let chunked =
416 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
417
418 assert!(!chunked.all_valid()?);
420 assert!(!chunked.all_invalid()?);
421
422 Ok(())
423 }
424}