vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9
10use futures_util::stream;
11use vortex_buffer::{Buffer, BufferMut};
12use vortex_dtype::DType;
13use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
14use vortex_mask::Mask;
15
16use crate::arrays::ChunkedVTable;
17use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
18use crate::search_sorted::{SearchSorted, SearchSortedSide};
19use crate::stats::{ArrayStats, StatsSetRef};
20use crate::stream::{ArrayStream, ArrayStreamAdapter};
21use crate::vtable::{ArrayVTable, ValidityVTable};
22use crate::{Array, ArrayRef, IntoArray};
23
24#[derive(Clone, Debug)]
25pub struct ChunkedArray {
26    dtype: DType,
27    len: usize,
28    chunk_offsets: Buffer<u64>,
29    chunks: Vec<ArrayRef>,
30    stats_set: ArrayStats,
31}
32
33impl ChunkedArray {
34    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
35        for chunk in &chunks {
36            if chunk.dtype() != &dtype {
37                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
38            }
39        }
40
41        // SAFETY: validation done above
42        unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
43    }
44
45    /// Create a new `ChunkedArray` from a set of chunks without verifying that all chunks have
46    /// the same DType.
47    ///
48    /// # Safety
49    ///
50    /// The caller must ensure that all chunks have the same DType, else downstream operations
51    /// may break correctness assumptions about `ChunkedArray` child types.
52    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
53        let nchunks = chunks.len();
54
55        let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
56        // SAFETY: nchunks + 1
57        unsafe { chunk_offsets.push_unchecked(0) }
58        let mut curr_offset = 0;
59        for c in &chunks {
60            curr_offset += c.len() as u64;
61            // SAFETY: nchunks + 1
62            unsafe { chunk_offsets.push_unchecked(curr_offset) }
63        }
64
65        Self {
66            dtype,
67            len: curr_offset.try_into().vortex_unwrap(),
68            chunk_offsets: chunk_offsets.freeze(),
69            chunks,
70            stats_set: Default::default(),
71        }
72    }
73
74    #[inline]
75    pub fn chunk(&self, idx: usize) -> &ArrayRef {
76        assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
77
78        &self.chunks[idx]
79    }
80
81    pub fn nchunks(&self) -> usize {
82        self.chunks.len()
83    }
84
85    #[inline]
86    pub fn chunk_offsets(&self) -> &Buffer<u64> {
87        &self.chunk_offsets
88    }
89
90    pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
91        assert!(index <= self.len(), "Index out of bounds of the array");
92        let index = index as u64;
93
94        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
95        // and take the last chunk (we subtract 1 since there's a leading 0)
96        let index_chunk = self
97            .chunk_offsets()
98            .search_sorted(&index, SearchSortedSide::Right)
99            .to_ends_index(self.nchunks() + 1)
100            .saturating_sub(1);
101        let chunk_start = self.chunk_offsets()[index_chunk];
102
103        let index_in_chunk =
104            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
105        (index_chunk, index_in_chunk)
106    }
107
108    pub fn chunks(&self) -> &[ArrayRef] {
109        &self.chunks
110    }
111
112    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
113        self.chunks().iter().filter(|c| !c.is_empty())
114    }
115
116    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
117        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
118    }
119
120    pub fn array_stream(&self) -> impl ArrayStream + '_ {
121        ArrayStreamAdapter::new(
122            self.dtype().clone(),
123            stream::iter(self.chunks().iter().cloned().map(Ok)),
124        )
125    }
126
127    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
128        let mut new_chunks = Vec::new();
129        let mut chunks_to_combine = Vec::new();
130        let mut new_chunk_n_bytes = 0;
131        let mut new_chunk_n_elements = 0;
132        for chunk in self.chunks() {
133            let n_bytes = chunk.nbytes();
134            let n_elements = chunk.len();
135
136            if (new_chunk_n_bytes + n_bytes > target_bytesize
137                || new_chunk_n_elements + n_elements > target_rowsize)
138                && !chunks_to_combine.is_empty()
139            {
140                new_chunks.push(
141                    // SAFETY: combining chunks of same type maintains valid chunk types
142                    unsafe {
143                        ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
144                            .to_canonical()?
145                            .into_array()
146                    },
147                );
148
149                new_chunk_n_bytes = 0;
150                new_chunk_n_elements = 0;
151                chunks_to_combine = Vec::new();
152            }
153
154            if n_bytes > target_bytesize || n_elements > target_rowsize {
155                new_chunks.push(chunk.clone());
156            } else {
157                new_chunk_n_bytes += n_bytes;
158                new_chunk_n_elements += n_elements;
159                chunks_to_combine.push(chunk.clone());
160            }
161        }
162
163        if !chunks_to_combine.is_empty() {
164            new_chunks.push(unsafe {
165                // SAFETY: combining chunks of same type maintains valid chunk types
166                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
167                    .to_canonical()?
168                    .into_array()
169            });
170        }
171
172        // SAFETY: combining chunks of same type maintains valid chunk types
173        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
174    }
175}
176
177impl FromIterator<ArrayRef> for ChunkedArray {
178    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
179        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
180        let dtype = chunks
181            .first()
182            .map(|c| c.dtype().clone())
183            .vortex_expect("Cannot infer DType from an empty iterator");
184        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
185    }
186}
187
188impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
189    fn len(array: &ChunkedArray) -> usize {
190        array.len
191    }
192
193    fn dtype(array: &ChunkedArray) -> &DType {
194        &array.dtype
195    }
196
197    fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
198        array.stats_set.to_ref(array.as_ref())
199    }
200}
201
202impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
203    fn is_valid(array: &ChunkedArray, index: usize) -> bool {
204        if !array.dtype.is_nullable() {
205            return true;
206        }
207        let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
208        array.chunk(chunk).is_valid(offset_in_chunk)
209    }
210
211    fn all_valid(array: &ChunkedArray) -> bool {
212        if !array.dtype().is_nullable() {
213            return true;
214        }
215        for chunk in array.non_empty_chunks() {
216            if !chunk.all_valid() {
217                return false;
218            }
219        }
220        true
221    }
222
223    fn all_invalid(array: &ChunkedArray) -> bool {
224        if !array.dtype().is_nullable() {
225            return false;
226        }
227        for chunk in array.non_empty_chunks() {
228            if !chunk.all_invalid() {
229                return false;
230            }
231        }
232        true
233    }
234
235    fn validity_mask(array: &ChunkedArray) -> Mask {
236        array.chunks().iter().map(|a| a.validity_mask()).collect()
237    }
238}
239
240#[cfg(test)]
241mod test {
242    use vortex_buffer::buffer;
243    use vortex_dtype::{DType, Nullability, PType};
244    use vortex_error::VortexResult;
245
246    use crate::array::Array;
247    use crate::arrays::chunked::ChunkedArray;
248    use crate::arrays::{ChunkedVTable, PrimitiveArray};
249    use crate::compute::sub_scalar;
250    use crate::validity::Validity;
251    use crate::{IntoArray, ToCanonical, assert_arrays_eq};
252
253    fn chunked_array() -> ChunkedArray {
254        ChunkedArray::try_new(
255            vec![
256                buffer![1u64, 2, 3].into_array(),
257                buffer![4u64, 5, 6].into_array(),
258                buffer![7u64, 8, 9].into_array(),
259            ],
260            DType::Primitive(PType::U64, Nullability::NonNullable),
261        )
262        .unwrap()
263    }
264
265    #[test]
266    fn test_scalar_subtract() {
267        let chunked = chunked_array().into_array();
268        let to_subtract = 1u64;
269        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
270
271        let chunked = array.as_::<ChunkedVTable>();
272        let chunks_out = chunked.chunks();
273
274        let results = chunks_out[0]
275            .to_primitive()
276            .unwrap()
277            .as_slice::<u64>()
278            .to_vec();
279        assert_eq!(results, &[0u64, 1, 2]);
280        let results = chunks_out[1]
281            .to_primitive()
282            .unwrap()
283            .as_slice::<u64>()
284            .to_vec();
285        assert_eq!(results, &[3u64, 4, 5]);
286        let results = chunks_out[2]
287            .to_primitive()
288            .unwrap()
289            .as_slice::<u64>()
290            .to_vec();
291        assert_eq!(results, &[6u64, 7, 8]);
292    }
293
294    #[test]
295    fn test_rechunk_one_chunk() {
296        let chunked = ChunkedArray::try_new(
297            vec![buffer![0u64].into_array()],
298            DType::Primitive(PType::U64, Nullability::NonNullable),
299        )
300        .unwrap();
301
302        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
303
304        assert_arrays_eq!(chunked, rechunked);
305    }
306
307    #[test]
308    fn test_rechunk_two_chunks() {
309        let chunked = ChunkedArray::try_new(
310            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
311            DType::Primitive(PType::U64, Nullability::NonNullable),
312        )
313        .unwrap();
314
315        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
316
317        assert_eq!(rechunked.nchunks(), 1);
318        assert_arrays_eq!(chunked, rechunked);
319    }
320
321    #[test]
322    fn test_rechunk_tiny_target_chunks() {
323        let chunked = ChunkedArray::try_new(
324            vec![
325                buffer![0u64, 1, 2, 3].into_array(),
326                buffer![4u64, 5].into_array(),
327            ],
328            DType::Primitive(PType::U64, Nullability::NonNullable),
329        )
330        .unwrap();
331
332        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
333
334        assert_eq!(rechunked.nchunks(), 2);
335        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
336        assert_arrays_eq!(chunked, rechunked);
337    }
338
339    #[test]
340    fn test_rechunk_with_too_big_chunk() {
341        let chunked = ChunkedArray::try_new(
342            vec![
343                buffer![0u64, 1, 2].into_array(),
344                buffer![42_u64; 6].into_array(),
345                buffer![4u64, 5].into_array(),
346                buffer![6u64, 7].into_array(),
347                buffer![8u64, 9].into_array(),
348            ],
349            DType::Primitive(PType::U64, Nullability::NonNullable),
350        )
351        .unwrap();
352
353        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
354        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
355
356        assert_eq!(rechunked.nchunks(), 4);
357        assert_arrays_eq!(chunked, rechunked);
358    }
359
360    #[test]
361    fn test_empty_chunks_all_valid() -> VortexResult<()> {
362        // Create chunks where some are empty but all non-empty chunks have all valid values
363        let chunks = vec![
364            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
365            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
366            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
367            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
368        ];
369
370        let chunked =
371            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
372
373        // Should be all_valid since all non-empty chunks are all_valid
374        assert!(chunked.all_valid());
375        assert!(!chunked.all_invalid());
376
377        Ok(())
378    }
379
380    #[test]
381    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
382        // Create chunks where some are empty but all non-empty chunks have all invalid values
383        let chunks = vec![
384            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
385            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
386            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
387            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
388        ];
389
390        let chunked =
391            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
392
393        // Should be all_invalid since all non-empty chunks are all_invalid
394        assert!(!chunked.all_valid());
395        assert!(chunked.all_invalid());
396
397        Ok(())
398    }
399
400    #[test]
401    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
402        // Create chunks with mixed validity including empty chunks
403        let chunks = vec![
404            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
405            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
406            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
407            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
408        ];
409
410        let chunked =
411            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
412
413        // Should be neither all_valid nor all_invalid
414        assert!(!chunked.all_valid());
415        assert!(!chunked.all_invalid());
416
417        Ok(())
418    }
419}