vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9
10use futures::stream;
11use vortex_buffer::{Buffer, BufferMut};
12use vortex_dtype::DType;
13use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
14use vortex_mask::Mask;
15
16use crate::arrays::ChunkedVTable;
17use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
18use crate::search_sorted::{SearchSorted, SearchSortedSide};
19use crate::stats::{ArrayStats, StatsSetRef};
20use crate::stream::{ArrayStream, ArrayStreamAdapter};
21use crate::vtable::{ArrayVTable, ValidityVTable};
22use crate::{Array, ArrayRef, IntoArray};
23
24#[derive(Clone, Debug)]
25pub struct ChunkedArray {
26    dtype: DType,
27    len: usize,
28    chunk_offsets: Buffer<u64>,
29    chunks: Vec<ArrayRef>,
30    stats_set: ArrayStats,
31}
32
33impl ChunkedArray {
34    /// Constructs a new `ChunkedArray`.
35    ///
36    /// See [`ChunkedArray::new_unchecked`] for more information.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the provided components do not satisfy the invariants documented in
41    /// [`ChunkedArray::new_unchecked`].
42    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
43        Self::validate(&chunks, &dtype)?;
44
45        // SAFETY: validation done above.
46        unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
47    }
48
49    /// Creates a new [`ChunkedArray`] without validation from these components:
50    ///
51    /// * `chunks` is a vector of arrays to be concatenated logically.
52    /// * `dtype` is the common data type of all chunks.
53    ///
54    /// # Safety
55    ///
56    /// All chunks must have exactly the same [`DType`] as the provided `dtype`.
57    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
58        let nchunks = chunks.len();
59
60        let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
61        // SAFETY: nchunks + 1
62        unsafe { chunk_offsets.push_unchecked(0) }
63        let mut curr_offset = 0;
64        for c in &chunks {
65            curr_offset += c.len() as u64;
66            // SAFETY: nchunks + 1
67            unsafe { chunk_offsets.push_unchecked(curr_offset) }
68        }
69
70        Self {
71            dtype,
72            len: curr_offset.try_into().vortex_unwrap(),
73            chunk_offsets: chunk_offsets.freeze(),
74            chunks,
75            stats_set: Default::default(),
76        }
77    }
78
79    /// Validates the components that would be used to create a [`ChunkedArray`].
80    ///
81    /// This function checks all the invariants required by [`ChunkedArray::new_unchecked`].
82    pub(crate) fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
83        for chunk in chunks {
84            if chunk.dtype() != dtype {
85                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
86            }
87        }
88
89        Ok(())
90    }
91
92    #[inline]
93    pub fn chunk(&self, idx: usize) -> &ArrayRef {
94        assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
95
96        &self.chunks[idx]
97    }
98
99    pub fn nchunks(&self) -> usize {
100        self.chunks.len()
101    }
102
103    #[inline]
104    pub fn chunk_offsets(&self) -> &Buffer<u64> {
105        &self.chunk_offsets
106    }
107
108    pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
109        assert!(index <= self.len(), "Index out of bounds of the array");
110        let index = index as u64;
111
112        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
113        // and take the last chunk (we subtract 1 since there's a leading 0)
114        let index_chunk = self
115            .chunk_offsets()
116            .search_sorted(&index, SearchSortedSide::Right)
117            .to_ends_index(self.nchunks() + 1)
118            .saturating_sub(1);
119        let chunk_start = self.chunk_offsets()[index_chunk];
120
121        let index_in_chunk =
122            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
123        (index_chunk, index_in_chunk)
124    }
125
126    pub fn chunks(&self) -> &[ArrayRef] {
127        &self.chunks
128    }
129
130    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
131        self.chunks().iter().filter(|c| !c.is_empty())
132    }
133
134    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
135        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
136    }
137
138    pub fn array_stream(&self) -> impl ArrayStream + '_ {
139        ArrayStreamAdapter::new(
140            self.dtype().clone(),
141            stream::iter(self.chunks().iter().cloned().map(Ok)),
142        )
143    }
144
145    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
146        let mut new_chunks = Vec::new();
147        let mut chunks_to_combine = Vec::new();
148        let mut new_chunk_n_bytes = 0;
149        let mut new_chunk_n_elements = 0;
150        for chunk in self.chunks() {
151            let n_bytes = chunk.nbytes();
152            let n_elements = chunk.len();
153
154            if (new_chunk_n_bytes + n_bytes > target_bytesize
155                || new_chunk_n_elements + n_elements > target_rowsize)
156                && !chunks_to_combine.is_empty()
157            {
158                new_chunks.push(
159                    // SAFETY: chunks_to_combine contains valid chunks of the same dtype as self.
160                    // All chunks are guaranteed to be valid arrays matching self.dtype().
161                    unsafe {
162                        ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
163                            .to_canonical()
164                            .into_array()
165                    },
166                );
167
168                new_chunk_n_bytes = 0;
169                new_chunk_n_elements = 0;
170                chunks_to_combine = Vec::new();
171            }
172
173            if n_bytes > target_bytesize || n_elements > target_rowsize {
174                new_chunks.push(chunk.clone());
175            } else {
176                new_chunk_n_bytes += n_bytes;
177                new_chunk_n_elements += n_elements;
178                chunks_to_combine.push(chunk.clone());
179            }
180        }
181
182        if !chunks_to_combine.is_empty() {
183            new_chunks.push(unsafe {
184                // SAFETY: chunks_to_combine contains valid chunks of the same dtype as self.
185                // All chunks are guaranteed to be valid arrays matching self.dtype().
186                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
187                    .to_canonical()
188                    .into_array()
189            });
190        }
191
192        // SAFETY: new_chunks contains valid arrays of the same dtype as self.
193        // All chunks were either taken from self or created from self's chunks.
194        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
195    }
196}
197
198impl FromIterator<ArrayRef> for ChunkedArray {
199    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
200        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
201        let dtype = chunks
202            .first()
203            .map(|c| c.dtype().clone())
204            .vortex_expect("Cannot infer DType from an empty iterator");
205        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
206    }
207}
208
209impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
210    fn len(array: &ChunkedArray) -> usize {
211        array.len
212    }
213
214    fn dtype(array: &ChunkedArray) -> &DType {
215        &array.dtype
216    }
217
218    fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
219        array.stats_set.to_ref(array.as_ref())
220    }
221}
222
223impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
224    fn is_valid(array: &ChunkedArray, index: usize) -> bool {
225        if !array.dtype.is_nullable() {
226            return true;
227        }
228        let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
229        array.chunk(chunk).is_valid(offset_in_chunk)
230    }
231
232    fn all_valid(array: &ChunkedArray) -> bool {
233        if !array.dtype().is_nullable() {
234            return true;
235        }
236        for chunk in array.non_empty_chunks() {
237            if !chunk.all_valid() {
238                return false;
239            }
240        }
241        true
242    }
243
244    fn all_invalid(array: &ChunkedArray) -> bool {
245        if !array.dtype().is_nullable() {
246            return false;
247        }
248        for chunk in array.non_empty_chunks() {
249            if !chunk.all_invalid() {
250                return false;
251            }
252        }
253        true
254    }
255
256    fn validity_mask(array: &ChunkedArray) -> Mask {
257        array.chunks().iter().map(|a| a.validity_mask()).collect()
258    }
259}
260
261#[cfg(test)]
262mod test {
263    use vortex_buffer::buffer;
264    use vortex_dtype::{DType, Nullability, PType};
265    use vortex_error::VortexResult;
266
267    use crate::array::Array;
268    use crate::arrays::chunked::ChunkedArray;
269    use crate::arrays::{ChunkedVTable, PrimitiveArray};
270    use crate::compute::sub_scalar;
271    use crate::validity::Validity;
272    use crate::{IntoArray, ToCanonical, assert_arrays_eq};
273
274    fn chunked_array() -> ChunkedArray {
275        ChunkedArray::try_new(
276            vec![
277                buffer![1u64, 2, 3].into_array(),
278                buffer![4u64, 5, 6].into_array(),
279                buffer![7u64, 8, 9].into_array(),
280            ],
281            DType::Primitive(PType::U64, Nullability::NonNullable),
282        )
283        .unwrap()
284    }
285
286    #[test]
287    fn test_scalar_subtract() {
288        let chunked = chunked_array().into_array();
289        let to_subtract = 1u64;
290        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
291
292        let chunked = array.as_::<ChunkedVTable>();
293        let chunks_out = chunked.chunks();
294
295        let results = chunks_out[0].to_primitive().as_slice::<u64>().to_vec();
296        assert_eq!(results, &[0u64, 1, 2]);
297        let results = chunks_out[1].to_primitive().as_slice::<u64>().to_vec();
298        assert_eq!(results, &[3u64, 4, 5]);
299        let results = chunks_out[2].to_primitive().as_slice::<u64>().to_vec();
300        assert_eq!(results, &[6u64, 7, 8]);
301    }
302
303    #[test]
304    fn test_rechunk_one_chunk() {
305        let chunked = ChunkedArray::try_new(
306            vec![buffer![0u64].into_array()],
307            DType::Primitive(PType::U64, Nullability::NonNullable),
308        )
309        .unwrap();
310
311        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
312
313        assert_arrays_eq!(chunked, rechunked);
314    }
315
316    #[test]
317    fn test_rechunk_two_chunks() {
318        let chunked = ChunkedArray::try_new(
319            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
320            DType::Primitive(PType::U64, Nullability::NonNullable),
321        )
322        .unwrap();
323
324        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
325
326        assert_eq!(rechunked.nchunks(), 1);
327        assert_arrays_eq!(chunked, rechunked);
328    }
329
330    #[test]
331    fn test_rechunk_tiny_target_chunks() {
332        let chunked = ChunkedArray::try_new(
333            vec![
334                buffer![0u64, 1, 2, 3].into_array(),
335                buffer![4u64, 5].into_array(),
336            ],
337            DType::Primitive(PType::U64, Nullability::NonNullable),
338        )
339        .unwrap();
340
341        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
342
343        assert_eq!(rechunked.nchunks(), 2);
344        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
345        assert_arrays_eq!(chunked, rechunked);
346    }
347
348    #[test]
349    fn test_rechunk_with_too_big_chunk() {
350        let chunked = ChunkedArray::try_new(
351            vec![
352                buffer![0u64, 1, 2].into_array(),
353                buffer![42_u64; 6].into_array(),
354                buffer![4u64, 5].into_array(),
355                buffer![6u64, 7].into_array(),
356                buffer![8u64, 9].into_array(),
357            ],
358            DType::Primitive(PType::U64, Nullability::NonNullable),
359        )
360        .unwrap();
361
362        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
363        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
364
365        assert_eq!(rechunked.nchunks(), 4);
366        assert_arrays_eq!(chunked, rechunked);
367    }
368
369    #[test]
370    fn test_empty_chunks_all_valid() -> VortexResult<()> {
371        // Create chunks where some are empty but all non-empty chunks have all valid values
372        let chunks = vec![
373            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
374            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
375            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
376            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
377        ];
378
379        let chunked =
380            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
381
382        // Should be all_valid since all non-empty chunks are all_valid
383        assert!(chunked.all_valid());
384        assert!(!chunked.all_invalid());
385
386        Ok(())
387    }
388
389    #[test]
390    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
391        // Create chunks where some are empty but all non-empty chunks have all invalid values
392        let chunks = vec![
393            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
394            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
395            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
396            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
397        ];
398
399        let chunked =
400            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
401
402        // Should be all_invalid since all non-empty chunks are all_invalid
403        assert!(!chunked.all_valid());
404        assert!(chunked.all_invalid());
405
406        Ok(())
407    }
408
409    #[test]
410    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
411        // Create chunks with mixed validity including empty chunks
412        let chunks = vec![
413            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
414            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
415            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
416            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
417        ];
418
419        let chunked =
420            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
421
422        // Should be neither all_valid nor all_invalid
423        assert!(!chunked.all_valid());
424        assert!(!chunked.all_invalid());
425
426        Ok(())
427    }
428}