vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9
10use futures_util::stream;
11use itertools::Itertools;
12use vortex_buffer::{Buffer, BufferMut};
13use vortex_dtype::DType;
14use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
15use vortex_mask::Mask;
16
17use crate::arrays::ChunkedVTable;
18use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
19use crate::search_sorted::{SearchSorted, SearchSortedSide};
20use crate::stats::{ArrayStats, StatsSetRef};
21use crate::stream::{ArrayStream, ArrayStreamAdapter};
22use crate::vtable::{ArrayVTable, ValidityVTable};
23use crate::{Array, ArrayRef, IntoArray};
24
25#[derive(Clone, Debug)]
26pub struct ChunkedArray {
27    dtype: DType,
28    len: usize,
29    chunk_offsets: Buffer<u64>,
30    chunks: Vec<ArrayRef>,
31    stats_set: ArrayStats,
32}
33
34impl ChunkedArray {
35    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
36        for chunk in &chunks {
37            if chunk.dtype() != &dtype {
38                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
39            }
40        }
41
42        // SAFETY: validation done above
43        unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
44    }
45
46    /// Create a new `ChunkedArray` from a set of chunks without verifying that all chunks have
47    /// the same DType.
48    ///
49    /// # Safety
50    ///
51    /// The caller must ensure that all chunks have the same DType, else downstream operations
52    /// may break correctness assumptions about `ChunkedArray` child types.
53    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
54        let nchunks = chunks.len();
55
56        let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
57        // SAFETY: nchunks + 1
58        unsafe { chunk_offsets.push_unchecked(0) }
59        let mut curr_offset = 0;
60        for c in &chunks {
61            curr_offset += c.len() as u64;
62            // SAFETY: nchunks + 1
63            unsafe { chunk_offsets.push_unchecked(curr_offset) }
64        }
65
66        Self {
67            dtype,
68            len: curr_offset.try_into().vortex_unwrap(),
69            chunk_offsets: chunk_offsets.freeze(),
70            chunks,
71            stats_set: Default::default(),
72        }
73    }
74
75    #[inline]
76    pub fn chunk(&self, idx: usize) -> &ArrayRef {
77        assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
78
79        &self.chunks[idx]
80    }
81
82    pub fn nchunks(&self) -> usize {
83        self.chunks.len()
84    }
85
86    #[inline]
87    pub fn chunk_offsets(&self) -> &Buffer<u64> {
88        &self.chunk_offsets
89    }
90
91    pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
92        assert!(index <= self.len(), "Index out of bounds of the array");
93        let index = index as u64;
94
95        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
96        // and take the last chunk (we subtract 1 since there's a leading 0)
97        let index_chunk = self
98            .chunk_offsets()
99            .search_sorted(&index, SearchSortedSide::Right)
100            .to_ends_index(self.nchunks() + 1)
101            .saturating_sub(1);
102        let chunk_start = self.chunk_offsets()[index_chunk];
103
104        let index_in_chunk =
105            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
106        (index_chunk, index_in_chunk)
107    }
108
109    pub fn chunks(&self) -> &[ArrayRef] {
110        &self.chunks
111    }
112
113    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
114        self.chunks().iter().filter(|c| !c.is_empty())
115    }
116
117    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
118        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
119    }
120
121    pub fn array_stream(&self) -> impl ArrayStream + '_ {
122        ArrayStreamAdapter::new(
123            self.dtype().clone(),
124            stream::iter(self.chunks().iter().cloned().map(Ok)),
125        )
126    }
127
128    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
129        let mut new_chunks = Vec::new();
130        let mut chunks_to_combine = Vec::new();
131        let mut new_chunk_n_bytes = 0;
132        let mut new_chunk_n_elements = 0;
133        for chunk in self.chunks() {
134            let n_bytes = chunk.nbytes();
135            let n_elements = chunk.len();
136
137            if (new_chunk_n_bytes + n_bytes > target_bytesize
138                || new_chunk_n_elements + n_elements > target_rowsize)
139                && !chunks_to_combine.is_empty()
140            {
141                new_chunks.push(
142                    // SAFETY: combining chunks of same type maintains valid chunk types
143                    unsafe {
144                        ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
145                            .to_canonical()?
146                            .into_array()
147                    },
148                );
149
150                new_chunk_n_bytes = 0;
151                new_chunk_n_elements = 0;
152                chunks_to_combine = Vec::new();
153            }
154
155            if n_bytes > target_bytesize || n_elements > target_rowsize {
156                new_chunks.push(chunk.clone());
157            } else {
158                new_chunk_n_bytes += n_bytes;
159                new_chunk_n_elements += n_elements;
160                chunks_to_combine.push(chunk.clone());
161            }
162        }
163
164        if !chunks_to_combine.is_empty() {
165            new_chunks.push(unsafe {
166                // SAFETY: combining chunks of same type maintains valid chunk types
167                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
168                    .to_canonical()?
169                    .into_array()
170            });
171        }
172
173        // SAFETY: combining chunks of same type maintains valid chunk types
174        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
175    }
176}
177
178impl FromIterator<ArrayRef> for ChunkedArray {
179    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
180        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
181        let dtype = chunks
182            .first()
183            .map(|c| c.dtype().clone())
184            .vortex_expect("Cannot infer DType from an empty iterator");
185        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
186    }
187}
188
189impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
190    fn len(array: &ChunkedArray) -> usize {
191        array.len
192    }
193
194    fn dtype(array: &ChunkedArray) -> &DType {
195        &array.dtype
196    }
197
198    fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
199        array.stats_set.to_ref(array.as_ref())
200    }
201}
202
203impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
204    fn is_valid(array: &ChunkedArray, index: usize) -> VortexResult<bool> {
205        if !array.dtype.is_nullable() {
206            return Ok(true);
207        }
208        let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
209        array.chunk(chunk).is_valid(offset_in_chunk)
210    }
211
212    fn all_valid(array: &ChunkedArray) -> VortexResult<bool> {
213        if !array.dtype().is_nullable() {
214            return Ok(true);
215        }
216        for chunk in array.non_empty_chunks() {
217            if !chunk.all_valid()? {
218                return Ok(false);
219            }
220        }
221        Ok(true)
222    }
223
224    fn all_invalid(array: &ChunkedArray) -> VortexResult<bool> {
225        if !array.dtype().is_nullable() {
226            return Ok(false);
227        }
228        for chunk in array.non_empty_chunks() {
229            if !chunk.all_invalid()? {
230                return Ok(false);
231            }
232        }
233        Ok(true)
234    }
235
236    fn validity_mask(array: &ChunkedArray) -> VortexResult<Mask> {
237        array
238            .chunks()
239            .iter()
240            .map(|a| a.validity_mask())
241            .try_collect()
242    }
243}
244
245#[cfg(test)]
246mod test {
247    use vortex_buffer::buffer;
248    use vortex_dtype::{DType, Nullability, PType};
249    use vortex_error::VortexResult;
250
251    use crate::array::Array;
252    use crate::arrays::chunked::ChunkedArray;
253    use crate::arrays::{ChunkedVTable, PrimitiveArray};
254    use crate::compute::sub_scalar;
255    use crate::validity::Validity;
256    use crate::{IntoArray, ToCanonical, assert_arrays_eq};
257
258    fn chunked_array() -> ChunkedArray {
259        ChunkedArray::try_new(
260            vec![
261                buffer![1u64, 2, 3].into_array(),
262                buffer![4u64, 5, 6].into_array(),
263                buffer![7u64, 8, 9].into_array(),
264            ],
265            DType::Primitive(PType::U64, Nullability::NonNullable),
266        )
267        .unwrap()
268    }
269
270    #[test]
271    fn test_scalar_subtract() {
272        let chunked = chunked_array().into_array();
273        let to_subtract = 1u64;
274        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
275
276        let chunked = array.as_::<ChunkedVTable>();
277        let chunks_out = chunked.chunks();
278
279        let results = chunks_out[0]
280            .to_primitive()
281            .unwrap()
282            .as_slice::<u64>()
283            .to_vec();
284        assert_eq!(results, &[0u64, 1, 2]);
285        let results = chunks_out[1]
286            .to_primitive()
287            .unwrap()
288            .as_slice::<u64>()
289            .to_vec();
290        assert_eq!(results, &[3u64, 4, 5]);
291        let results = chunks_out[2]
292            .to_primitive()
293            .unwrap()
294            .as_slice::<u64>()
295            .to_vec();
296        assert_eq!(results, &[6u64, 7, 8]);
297    }
298
299    #[test]
300    fn test_rechunk_one_chunk() {
301        let chunked = ChunkedArray::try_new(
302            vec![buffer![0u64].into_array()],
303            DType::Primitive(PType::U64, Nullability::NonNullable),
304        )
305        .unwrap();
306
307        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
308
309        assert_arrays_eq!(chunked, rechunked);
310    }
311
312    #[test]
313    fn test_rechunk_two_chunks() {
314        let chunked = ChunkedArray::try_new(
315            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
316            DType::Primitive(PType::U64, Nullability::NonNullable),
317        )
318        .unwrap();
319
320        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
321
322        assert_eq!(rechunked.nchunks(), 1);
323        assert_arrays_eq!(chunked, rechunked);
324    }
325
326    #[test]
327    fn test_rechunk_tiny_target_chunks() {
328        let chunked = ChunkedArray::try_new(
329            vec![
330                buffer![0u64, 1, 2, 3].into_array(),
331                buffer![4u64, 5].into_array(),
332            ],
333            DType::Primitive(PType::U64, Nullability::NonNullable),
334        )
335        .unwrap();
336
337        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
338
339        assert_eq!(rechunked.nchunks(), 2);
340        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
341        assert_arrays_eq!(chunked, rechunked);
342    }
343
344    #[test]
345    fn test_rechunk_with_too_big_chunk() {
346        let chunked = ChunkedArray::try_new(
347            vec![
348                buffer![0u64, 1, 2].into_array(),
349                buffer![42_u64; 6].into_array(),
350                buffer![4u64, 5].into_array(),
351                buffer![6u64, 7].into_array(),
352                buffer![8u64, 9].into_array(),
353            ],
354            DType::Primitive(PType::U64, Nullability::NonNullable),
355        )
356        .unwrap();
357
358        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
359        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
360
361        assert_eq!(rechunked.nchunks(), 4);
362        assert_arrays_eq!(chunked, rechunked);
363    }
364
365    #[test]
366    fn test_empty_chunks_all_valid() -> VortexResult<()> {
367        // Create chunks where some are empty but all non-empty chunks have all valid values
368        let chunks = vec![
369            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
370            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
371            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
372            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
373        ];
374
375        let chunked =
376            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
377
378        // Should be all_valid since all non-empty chunks are all_valid
379        assert!(chunked.all_valid()?);
380        assert!(!chunked.all_invalid()?);
381
382        Ok(())
383    }
384
385    #[test]
386    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
387        // Create chunks where some are empty but all non-empty chunks have all invalid values
388        let chunks = vec![
389            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
390            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
391            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
392            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
393        ];
394
395        let chunked =
396            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
397
398        // Should be all_invalid since all non-empty chunks are all_invalid
399        assert!(!chunked.all_valid()?);
400        assert!(chunked.all_invalid()?);
401
402        Ok(())
403    }
404
405    #[test]
406    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
407        // Create chunks with mixed validity including empty chunks
408        let chunks = vec![
409            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
410            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
411            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
412            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
413        ];
414
415        let chunked =
416            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
417
418        // Should be neither all_valid nor all_invalid
419        assert!(!chunked.all_valid()?);
420        assert!(!chunked.all_invalid()?);
421
422        Ok(())
423    }
424}