vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9
10use futures::stream;
11use vortex_buffer::{Buffer, BufferMut};
12use vortex_dtype::DType;
13use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
14use vortex_mask::Mask;
15
16use crate::arrays::ChunkedVTable;
17use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
18use crate::search_sorted::{SearchSorted, SearchSortedSide};
19use crate::stats::{ArrayStats, StatsSetRef};
20use crate::stream::{ArrayStream, ArrayStreamAdapter};
21use crate::vtable::{ArrayVTable, ValidityVTable};
22use crate::{Array, ArrayRef, IntoArray};
23
24#[derive(Clone, Debug)]
25pub struct ChunkedArray {
26 dtype: DType,
27 len: usize,
28 chunk_offsets: Buffer<u64>,
29 chunks: Vec<ArrayRef>,
30 stats_set: ArrayStats,
31}
32
33impl ChunkedArray {
34 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
43 Self::validate(&chunks, &dtype)?;
44
45 unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
47 }
48
49 pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
58 let nchunks = chunks.len();
59
60 let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
61 unsafe { chunk_offsets.push_unchecked(0) }
63 let mut curr_offset = 0;
64 for c in &chunks {
65 curr_offset += c.len() as u64;
66 unsafe { chunk_offsets.push_unchecked(curr_offset) }
68 }
69
70 Self {
71 dtype,
72 len: curr_offset.try_into().vortex_unwrap(),
73 chunk_offsets: chunk_offsets.freeze(),
74 chunks,
75 stats_set: Default::default(),
76 }
77 }
78
79 pub(crate) fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
83 for chunk in chunks {
84 if chunk.dtype() != dtype {
85 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
86 }
87 }
88
89 Ok(())
90 }
91
92 #[inline]
93 pub fn chunk(&self, idx: usize) -> &ArrayRef {
94 assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
95
96 &self.chunks[idx]
97 }
98
99 pub fn nchunks(&self) -> usize {
100 self.chunks.len()
101 }
102
103 #[inline]
104 pub fn chunk_offsets(&self) -> &Buffer<u64> {
105 &self.chunk_offsets
106 }
107
108 pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
109 assert!(index <= self.len(), "Index out of bounds of the array");
110 let index = index as u64;
111
112 let index_chunk = self
115 .chunk_offsets()
116 .search_sorted(&index, SearchSortedSide::Right)
117 .to_ends_index(self.nchunks() + 1)
118 .saturating_sub(1);
119 let chunk_start = self.chunk_offsets()[index_chunk];
120
121 let index_in_chunk =
122 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
123 (index_chunk, index_in_chunk)
124 }
125
126 pub fn chunks(&self) -> &[ArrayRef] {
127 &self.chunks
128 }
129
130 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
131 self.chunks().iter().filter(|c| !c.is_empty())
132 }
133
134 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
135 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
136 }
137
138 pub fn array_stream(&self) -> impl ArrayStream + '_ {
139 ArrayStreamAdapter::new(
140 self.dtype().clone(),
141 stream::iter(self.chunks().iter().cloned().map(Ok)),
142 )
143 }
144
145 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
146 let mut new_chunks = Vec::new();
147 let mut chunks_to_combine = Vec::new();
148 let mut new_chunk_n_bytes = 0;
149 let mut new_chunk_n_elements = 0;
150 for chunk in self.chunks() {
151 let n_bytes = chunk.nbytes();
152 let n_elements = chunk.len();
153
154 if (new_chunk_n_bytes + n_bytes > target_bytesize
155 || new_chunk_n_elements + n_elements > target_rowsize)
156 && !chunks_to_combine.is_empty()
157 {
158 new_chunks.push(
159 unsafe {
162 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
163 .to_canonical()
164 .into_array()
165 },
166 );
167
168 new_chunk_n_bytes = 0;
169 new_chunk_n_elements = 0;
170 chunks_to_combine = Vec::new();
171 }
172
173 if n_bytes > target_bytesize || n_elements > target_rowsize {
174 new_chunks.push(chunk.clone());
175 } else {
176 new_chunk_n_bytes += n_bytes;
177 new_chunk_n_elements += n_elements;
178 chunks_to_combine.push(chunk.clone());
179 }
180 }
181
182 if !chunks_to_combine.is_empty() {
183 new_chunks.push(unsafe {
184 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
187 .to_canonical()
188 .into_array()
189 });
190 }
191
192 unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
195 }
196}
197
198impl FromIterator<ArrayRef> for ChunkedArray {
199 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
200 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
201 let dtype = chunks
202 .first()
203 .map(|c| c.dtype().clone())
204 .vortex_expect("Cannot infer DType from an empty iterator");
205 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
206 }
207}
208
209impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
210 fn len(array: &ChunkedArray) -> usize {
211 array.len
212 }
213
214 fn dtype(array: &ChunkedArray) -> &DType {
215 &array.dtype
216 }
217
218 fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
219 array.stats_set.to_ref(array.as_ref())
220 }
221}
222
223impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
224 fn is_valid(array: &ChunkedArray, index: usize) -> bool {
225 if !array.dtype.is_nullable() {
226 return true;
227 }
228 let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
229 array.chunk(chunk).is_valid(offset_in_chunk)
230 }
231
232 fn all_valid(array: &ChunkedArray) -> bool {
233 if !array.dtype().is_nullable() {
234 return true;
235 }
236 for chunk in array.non_empty_chunks() {
237 if !chunk.all_valid() {
238 return false;
239 }
240 }
241 true
242 }
243
244 fn all_invalid(array: &ChunkedArray) -> bool {
245 if !array.dtype().is_nullable() {
246 return false;
247 }
248 for chunk in array.non_empty_chunks() {
249 if !chunk.all_invalid() {
250 return false;
251 }
252 }
253 true
254 }
255
256 fn validity_mask(array: &ChunkedArray) -> Mask {
257 array.chunks().iter().map(|a| a.validity_mask()).collect()
258 }
259}
260
261#[cfg(test)]
262mod test {
263 use vortex_buffer::buffer;
264 use vortex_dtype::{DType, Nullability, PType};
265 use vortex_error::VortexResult;
266
267 use crate::array::Array;
268 use crate::arrays::chunked::ChunkedArray;
269 use crate::arrays::{ChunkedVTable, PrimitiveArray};
270 use crate::compute::sub_scalar;
271 use crate::validity::Validity;
272 use crate::{IntoArray, ToCanonical, assert_arrays_eq};
273
274 fn chunked_array() -> ChunkedArray {
275 ChunkedArray::try_new(
276 vec![
277 buffer![1u64, 2, 3].into_array(),
278 buffer![4u64, 5, 6].into_array(),
279 buffer![7u64, 8, 9].into_array(),
280 ],
281 DType::Primitive(PType::U64, Nullability::NonNullable),
282 )
283 .unwrap()
284 }
285
286 #[test]
287 fn test_scalar_subtract() {
288 let chunked = chunked_array().into_array();
289 let to_subtract = 1u64;
290 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
291
292 let chunked = array.as_::<ChunkedVTable>();
293 let chunks_out = chunked.chunks();
294
295 let results = chunks_out[0].to_primitive().as_slice::<u64>().to_vec();
296 assert_eq!(results, &[0u64, 1, 2]);
297 let results = chunks_out[1].to_primitive().as_slice::<u64>().to_vec();
298 assert_eq!(results, &[3u64, 4, 5]);
299 let results = chunks_out[2].to_primitive().as_slice::<u64>().to_vec();
300 assert_eq!(results, &[6u64, 7, 8]);
301 }
302
303 #[test]
304 fn test_rechunk_one_chunk() {
305 let chunked = ChunkedArray::try_new(
306 vec![buffer![0u64].into_array()],
307 DType::Primitive(PType::U64, Nullability::NonNullable),
308 )
309 .unwrap();
310
311 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
312
313 assert_arrays_eq!(chunked, rechunked);
314 }
315
316 #[test]
317 fn test_rechunk_two_chunks() {
318 let chunked = ChunkedArray::try_new(
319 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
320 DType::Primitive(PType::U64, Nullability::NonNullable),
321 )
322 .unwrap();
323
324 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
325
326 assert_eq!(rechunked.nchunks(), 1);
327 assert_arrays_eq!(chunked, rechunked);
328 }
329
330 #[test]
331 fn test_rechunk_tiny_target_chunks() {
332 let chunked = ChunkedArray::try_new(
333 vec![
334 buffer![0u64, 1, 2, 3].into_array(),
335 buffer![4u64, 5].into_array(),
336 ],
337 DType::Primitive(PType::U64, Nullability::NonNullable),
338 )
339 .unwrap();
340
341 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
342
343 assert_eq!(rechunked.nchunks(), 2);
344 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
345 assert_arrays_eq!(chunked, rechunked);
346 }
347
348 #[test]
349 fn test_rechunk_with_too_big_chunk() {
350 let chunked = ChunkedArray::try_new(
351 vec![
352 buffer![0u64, 1, 2].into_array(),
353 buffer![42_u64; 6].into_array(),
354 buffer![4u64, 5].into_array(),
355 buffer![6u64, 7].into_array(),
356 buffer![8u64, 9].into_array(),
357 ],
358 DType::Primitive(PType::U64, Nullability::NonNullable),
359 )
360 .unwrap();
361
362 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
363 assert_eq!(rechunked.nchunks(), 4);
366 assert_arrays_eq!(chunked, rechunked);
367 }
368
369 #[test]
370 fn test_empty_chunks_all_valid() -> VortexResult<()> {
371 let chunks = vec![
373 PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
374 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
376 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), ];
378
379 let chunked =
380 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
381
382 assert!(chunked.all_valid());
384 assert!(!chunked.all_invalid());
385
386 Ok(())
387 }
388
389 #[test]
390 fn test_empty_chunks_all_invalid() -> VortexResult<()> {
391 let chunks = vec![
393 PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
394 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
396 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
398
399 let chunked =
400 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
401
402 assert!(!chunked.all_valid());
404 assert!(chunked.all_invalid());
405
406 Ok(())
407 }
408
409 #[test]
410 fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
411 let chunks = vec![
413 PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
414 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
416 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
418
419 let chunked =
420 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
421
422 assert!(!chunked.all_valid());
424 assert!(!chunked.all_invalid());
425
426 Ok(())
427 }
428}