vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9
10use futures::stream;
11use vortex_buffer::Buffer;
12use vortex_buffer::BufferMut;
13use vortex_dtype::DType;
14use vortex_error::VortexExpect as _;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17
18use crate::Array;
19use crate::ArrayRef;
20use crate::IntoArray;
21use crate::arrays::PrimitiveArray;
22use crate::iter::ArrayIterator;
23use crate::iter::ArrayIteratorAdapter;
24use crate::search_sorted::SearchSorted;
25use crate::search_sorted::SearchSortedSide;
26use crate::stats::ArrayStats;
27use crate::stream::ArrayStream;
28use crate::stream::ArrayStreamAdapter;
29use crate::validity::Validity;
30
31#[derive(Clone, Debug)]
32pub struct ChunkedArray {
33    pub(super) dtype: DType,
34    pub(super) len: usize,
35    pub(super) chunk_offsets: PrimitiveArray,
36    pub(super) chunks: Vec<ArrayRef>,
37    pub(super) stats_set: ArrayStats,
38}
39
40impl ChunkedArray {
41    /// Constructs a new `ChunkedArray`.
42    ///
43    /// See [`ChunkedArray::new_unchecked`] for more information.
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if the provided components do not satisfy the invariants documented in
48    /// [`ChunkedArray::new_unchecked`].
49    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
50        Self::validate(&chunks, &dtype)?;
51
52        // SAFETY: validation done above.
53        unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
54    }
55
56    /// Creates a new [`ChunkedArray`] without validation from these components:
57    ///
58    /// * `chunks` is a vector of arrays to be concatenated logically.
59    /// * `dtype` is the common data type of all chunks.
60    ///
61    /// # Safety
62    ///
63    /// All chunks must have exactly the same [`DType`] as the provided `dtype`.
64    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
65        #[cfg(debug_assertions)]
66        Self::validate(&chunks, &dtype)
67            .vortex_expect("[Debug Assertion]: Invalid `ChunkedArray` parameters");
68
69        let nchunks = chunks.len();
70
71        let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(nchunks + 1);
72        // SAFETY: nchunks + 1
73        unsafe { chunk_offsets_buf.push_unchecked(0) }
74        let mut curr_offset = 0;
75        for c in &chunks {
76            curr_offset += c.len() as u64;
77            // SAFETY: nchunks + 1
78            unsafe { chunk_offsets_buf.push_unchecked(curr_offset) }
79        }
80
81        let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable);
82
83        Self {
84            dtype,
85            len: curr_offset
86                .try_into()
87                .vortex_expect("chunk offset must fit in usize"),
88            chunk_offsets,
89            chunks,
90            stats_set: Default::default(),
91        }
92    }
93
94    /// Validates the components that would be used to create a [`ChunkedArray`].
95    ///
96    /// This function checks all the invariants required by [`ChunkedArray::new_unchecked`].
97    pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
98        for chunk in chunks {
99            if chunk.dtype() != dtype {
100                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
101            }
102        }
103
104        Ok(())
105    }
106
107    #[inline]
108    pub fn chunk(&self, idx: usize) -> &ArrayRef {
109        assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
110
111        &self.chunks[idx]
112    }
113
114    pub fn nchunks(&self) -> usize {
115        self.chunks.len()
116    }
117
118    #[inline]
119    pub fn chunk_offsets(&self) -> Buffer<u64> {
120        self.chunk_offsets.buffer()
121    }
122
123    pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
124        assert!(index <= self.len(), "Index out of bounds of the array");
125        let index = index as u64;
126
127        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
128        // and take the last chunk (we subtract 1 since there's a leading 0)
129        let index_chunk = self
130            .chunk_offsets()
131            .search_sorted(&index, SearchSortedSide::Right)
132            .to_ends_index(self.nchunks() + 1)
133            .saturating_sub(1);
134        let chunk_start = self.chunk_offsets()[index_chunk];
135
136        let index_in_chunk =
137            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
138        (index_chunk, index_in_chunk)
139    }
140
141    pub fn chunks(&self) -> &[ArrayRef] {
142        &self.chunks
143    }
144
145    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
146        self.chunks().iter().filter(|c| !c.is_empty())
147    }
148
149    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
150        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
151    }
152
153    pub fn array_stream(&self) -> impl ArrayStream + '_ {
154        ArrayStreamAdapter::new(
155            self.dtype().clone(),
156            stream::iter(self.chunks().iter().cloned().map(Ok)),
157        )
158    }
159
160    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
161        let mut new_chunks = Vec::new();
162        let mut chunks_to_combine = Vec::new();
163        let mut new_chunk_n_bytes = 0;
164        let mut new_chunk_n_elements = 0;
165        for chunk in self.chunks() {
166            let n_bytes = chunk.nbytes();
167            let n_elements = chunk.len();
168
169            if (new_chunk_n_bytes + n_bytes > target_bytesize
170                || new_chunk_n_elements + n_elements > target_rowsize)
171                && !chunks_to_combine.is_empty()
172            {
173                new_chunks.push(
174                    // SAFETY: chunks_to_combine contains valid chunks of the same dtype as self.
175                    // All chunks are guaranteed to be valid arrays matching self.dtype().
176                    unsafe {
177                        ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
178                            .to_canonical()
179                            .into_array()
180                    },
181                );
182
183                new_chunk_n_bytes = 0;
184                new_chunk_n_elements = 0;
185                chunks_to_combine = Vec::new();
186            }
187
188            if n_bytes > target_bytesize || n_elements > target_rowsize {
189                new_chunks.push(chunk.clone());
190            } else {
191                new_chunk_n_bytes += n_bytes;
192                new_chunk_n_elements += n_elements;
193                chunks_to_combine.push(chunk.clone());
194            }
195        }
196
197        if !chunks_to_combine.is_empty() {
198            new_chunks.push(unsafe {
199                // SAFETY: chunks_to_combine contains valid chunks of the same dtype as self.
200                // All chunks are guaranteed to be valid arrays matching self.dtype().
201                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
202                    .to_canonical()
203                    .into_array()
204            });
205        }
206
207        // SAFETY: new_chunks contains valid arrays of the same dtype as self.
208        // All chunks were either taken from self or created from self's chunks.
209        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
210    }
211}
212
213impl FromIterator<ArrayRef> for ChunkedArray {
214    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
215        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
216        let dtype = chunks
217            .first()
218            .map(|c| c.dtype().clone())
219            .vortex_expect("Cannot infer DType from an empty iterator");
220        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
221    }
222}
223
224#[cfg(test)]
225mod test {
226    use vortex_buffer::buffer;
227    use vortex_dtype::DType;
228    use vortex_dtype::Nullability;
229    use vortex_dtype::PType;
230    use vortex_error::VortexResult;
231
232    use crate::IntoArray;
233    use crate::ToCanonical;
234    use crate::array::Array;
235    use crate::arrays::ChunkedVTable;
236    use crate::arrays::PrimitiveArray;
237    use crate::arrays::chunked::ChunkedArray;
238    use crate::assert_arrays_eq;
239    use crate::compute::sub_scalar;
240    use crate::validity::Validity;
241
242    fn chunked_array() -> ChunkedArray {
243        ChunkedArray::try_new(
244            vec![
245                buffer![1u64, 2, 3].into_array(),
246                buffer![4u64, 5, 6].into_array(),
247                buffer![7u64, 8, 9].into_array(),
248            ],
249            DType::Primitive(PType::U64, Nullability::NonNullable),
250        )
251        .unwrap()
252    }
253
254    #[test]
255    fn test_scalar_subtract() {
256        let chunked = chunked_array().into_array();
257        let to_subtract = 1u64;
258        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
259
260        let chunked = array.as_::<ChunkedVTable>();
261        let chunks_out = chunked.chunks();
262
263        let results = chunks_out[0].to_primitive().as_slice::<u64>().to_vec();
264        assert_eq!(results, &[0u64, 1, 2]);
265        let results = chunks_out[1].to_primitive().as_slice::<u64>().to_vec();
266        assert_eq!(results, &[3u64, 4, 5]);
267        let results = chunks_out[2].to_primitive().as_slice::<u64>().to_vec();
268        assert_eq!(results, &[6u64, 7, 8]);
269    }
270
271    #[test]
272    fn test_rechunk_one_chunk() {
273        let chunked = ChunkedArray::try_new(
274            vec![buffer![0u64].into_array()],
275            DType::Primitive(PType::U64, Nullability::NonNullable),
276        )
277        .unwrap();
278
279        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
280
281        assert_arrays_eq!(chunked, rechunked);
282    }
283
284    #[test]
285    fn test_rechunk_two_chunks() {
286        let chunked = ChunkedArray::try_new(
287            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
288            DType::Primitive(PType::U64, Nullability::NonNullable),
289        )
290        .unwrap();
291
292        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
293
294        assert_eq!(rechunked.nchunks(), 1);
295        assert_arrays_eq!(chunked, rechunked);
296    }
297
298    #[test]
299    fn test_rechunk_tiny_target_chunks() {
300        let chunked = ChunkedArray::try_new(
301            vec![
302                buffer![0u64, 1, 2, 3].into_array(),
303                buffer![4u64, 5].into_array(),
304            ],
305            DType::Primitive(PType::U64, Nullability::NonNullable),
306        )
307        .unwrap();
308
309        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
310
311        assert_eq!(rechunked.nchunks(), 2);
312        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
313        assert_arrays_eq!(chunked, rechunked);
314    }
315
316    #[test]
317    fn test_rechunk_with_too_big_chunk() {
318        let chunked = ChunkedArray::try_new(
319            vec![
320                buffer![0u64, 1, 2].into_array(),
321                buffer![42_u64; 6].into_array(),
322                buffer![4u64, 5].into_array(),
323                buffer![6u64, 7].into_array(),
324                buffer![8u64, 9].into_array(),
325            ],
326            DType::Primitive(PType::U64, Nullability::NonNullable),
327        )
328        .unwrap();
329
330        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
331        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
332
333        assert_eq!(rechunked.nchunks(), 4);
334        assert_arrays_eq!(chunked, rechunked);
335    }
336
337    #[test]
338    fn test_empty_chunks_all_valid() -> VortexResult<()> {
339        // Create chunks where some are empty but all non-empty chunks have all valid values
340        let chunks = vec![
341            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
342            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
343            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
344            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
345        ];
346
347        let chunked =
348            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
349
350        // Should be all_valid since all non-empty chunks are all_valid
351        assert!(chunked.all_valid());
352        assert!(!chunked.all_invalid());
353
354        Ok(())
355    }
356
357    #[test]
358    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
359        // Create chunks where some are empty but all non-empty chunks have all invalid values
360        let chunks = vec![
361            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
362            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
363            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
364            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
365        ];
366
367        let chunked =
368            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
369
370        // Should be all_invalid since all non-empty chunks are all_invalid
371        assert!(!chunked.all_valid());
372        assert!(chunked.all_invalid());
373
374        Ok(())
375    }
376
377    #[test]
378    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
379        // Create chunks with mixed validity including empty chunks
380        let chunks = vec![
381            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
382            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
383            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
384            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
385        ];
386
387        let chunked =
388            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
389
390        // Should be neither all_valid nor all_invalid
391        assert!(!chunked.all_valid());
392        assert!(!chunked.all_invalid());
393
394        Ok(())
395    }
396}