vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9
10use futures::stream;
11use vortex_buffer::Buffer;
12use vortex_buffer::BufferMut;
13use vortex_dtype::DType;
14use vortex_error::VortexExpect as _;
15use vortex_error::VortexResult;
16use vortex_error::VortexUnwrap;
17use vortex_error::vortex_bail;
18
19use crate::Array;
20use crate::ArrayRef;
21use crate::IntoArray;
22use crate::arrays::PrimitiveArray;
23use crate::iter::ArrayIterator;
24use crate::iter::ArrayIteratorAdapter;
25use crate::search_sorted::SearchSorted;
26use crate::search_sorted::SearchSortedSide;
27use crate::stats::ArrayStats;
28use crate::stream::ArrayStream;
29use crate::stream::ArrayStreamAdapter;
30use crate::validity::Validity;
31
32#[derive(Clone, Debug)]
33pub struct ChunkedArray {
34    pub(super) dtype: DType,
35    pub(super) len: usize,
36    pub(super) chunk_offsets: PrimitiveArray,
37    pub(super) chunks: Vec<ArrayRef>,
38    pub(super) stats_set: ArrayStats,
39}
40
41impl ChunkedArray {
42    /// Constructs a new `ChunkedArray`.
43    ///
44    /// See [`ChunkedArray::new_unchecked`] for more information.
45    ///
46    /// # Errors
47    ///
48    /// Returns an error if the provided components do not satisfy the invariants documented in
49    /// [`ChunkedArray::new_unchecked`].
50    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
51        Self::validate(&chunks, &dtype)?;
52
53        // SAFETY: validation done above.
54        unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
55    }
56
57    /// Creates a new [`ChunkedArray`] without validation from these components:
58    ///
59    /// * `chunks` is a vector of arrays to be concatenated logically.
60    /// * `dtype` is the common data type of all chunks.
61    ///
62    /// # Safety
63    ///
64    /// All chunks must have exactly the same [`DType`] as the provided `dtype`.
65    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
66        #[cfg(debug_assertions)]
67        Self::validate(&chunks, &dtype)
68            .vortex_expect("[Debug Assertion]: Invalid `ChunkedArray` parameters");
69
70        let nchunks = chunks.len();
71
72        let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(nchunks + 1);
73        // SAFETY: nchunks + 1
74        unsafe { chunk_offsets_buf.push_unchecked(0) }
75        let mut curr_offset = 0;
76        for c in &chunks {
77            curr_offset += c.len() as u64;
78            // SAFETY: nchunks + 1
79            unsafe { chunk_offsets_buf.push_unchecked(curr_offset) }
80        }
81
82        let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable);
83
84        Self {
85            dtype,
86            len: curr_offset.try_into().vortex_unwrap(),
87            chunk_offsets,
88            chunks,
89            stats_set: Default::default(),
90        }
91    }
92
93    /// Validates the components that would be used to create a [`ChunkedArray`].
94    ///
95    /// This function checks all the invariants required by [`ChunkedArray::new_unchecked`].
96    pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
97        for chunk in chunks {
98            if chunk.dtype() != dtype {
99                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
100            }
101        }
102
103        Ok(())
104    }
105
106    #[inline]
107    pub fn chunk(&self, idx: usize) -> &ArrayRef {
108        assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
109
110        &self.chunks[idx]
111    }
112
113    pub fn nchunks(&self) -> usize {
114        self.chunks.len()
115    }
116
117    #[inline]
118    pub fn chunk_offsets(&self) -> Buffer<u64> {
119        self.chunk_offsets.buffer()
120    }
121
122    pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
123        assert!(index <= self.len(), "Index out of bounds of the array");
124        let index = index as u64;
125
126        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
127        // and take the last chunk (we subtract 1 since there's a leading 0)
128        let index_chunk = self
129            .chunk_offsets()
130            .search_sorted(&index, SearchSortedSide::Right)
131            .to_ends_index(self.nchunks() + 1)
132            .saturating_sub(1);
133        let chunk_start = self.chunk_offsets()[index_chunk];
134
135        let index_in_chunk =
136            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
137        (index_chunk, index_in_chunk)
138    }
139
140    pub fn chunks(&self) -> &[ArrayRef] {
141        &self.chunks
142    }
143
144    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
145        self.chunks().iter().filter(|c| !c.is_empty())
146    }
147
148    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
149        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
150    }
151
152    pub fn array_stream(&self) -> impl ArrayStream + '_ {
153        ArrayStreamAdapter::new(
154            self.dtype().clone(),
155            stream::iter(self.chunks().iter().cloned().map(Ok)),
156        )
157    }
158
159    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
160        let mut new_chunks = Vec::new();
161        let mut chunks_to_combine = Vec::new();
162        let mut new_chunk_n_bytes = 0;
163        let mut new_chunk_n_elements = 0;
164        for chunk in self.chunks() {
165            let n_bytes = chunk.nbytes();
166            let n_elements = chunk.len();
167
168            if (new_chunk_n_bytes + n_bytes > target_bytesize
169                || new_chunk_n_elements + n_elements > target_rowsize)
170                && !chunks_to_combine.is_empty()
171            {
172                new_chunks.push(
173                    // SAFETY: chunks_to_combine contains valid chunks of the same dtype as self.
174                    // All chunks are guaranteed to be valid arrays matching self.dtype().
175                    unsafe {
176                        ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
177                            .to_canonical()
178                            .into_array()
179                    },
180                );
181
182                new_chunk_n_bytes = 0;
183                new_chunk_n_elements = 0;
184                chunks_to_combine = Vec::new();
185            }
186
187            if n_bytes > target_bytesize || n_elements > target_rowsize {
188                new_chunks.push(chunk.clone());
189            } else {
190                new_chunk_n_bytes += n_bytes;
191                new_chunk_n_elements += n_elements;
192                chunks_to_combine.push(chunk.clone());
193            }
194        }
195
196        if !chunks_to_combine.is_empty() {
197            new_chunks.push(unsafe {
198                // SAFETY: chunks_to_combine contains valid chunks of the same dtype as self.
199                // All chunks are guaranteed to be valid arrays matching self.dtype().
200                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
201                    .to_canonical()
202                    .into_array()
203            });
204        }
205
206        // SAFETY: new_chunks contains valid arrays of the same dtype as self.
207        // All chunks were either taken from self or created from self's chunks.
208        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
209    }
210}
211
212impl FromIterator<ArrayRef> for ChunkedArray {
213    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
214        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
215        let dtype = chunks
216            .first()
217            .map(|c| c.dtype().clone())
218            .vortex_expect("Cannot infer DType from an empty iterator");
219        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
220    }
221}
222
223#[cfg(test)]
224mod test {
225    use vortex_buffer::buffer;
226    use vortex_dtype::DType;
227    use vortex_dtype::Nullability;
228    use vortex_dtype::PType;
229    use vortex_error::VortexResult;
230
231    use crate::IntoArray;
232    use crate::ToCanonical;
233    use crate::array::Array;
234    use crate::arrays::ChunkedVTable;
235    use crate::arrays::PrimitiveArray;
236    use crate::arrays::chunked::ChunkedArray;
237    use crate::assert_arrays_eq;
238    use crate::compute::sub_scalar;
239    use crate::validity::Validity;
240
241    fn chunked_array() -> ChunkedArray {
242        ChunkedArray::try_new(
243            vec![
244                buffer![1u64, 2, 3].into_array(),
245                buffer![4u64, 5, 6].into_array(),
246                buffer![7u64, 8, 9].into_array(),
247            ],
248            DType::Primitive(PType::U64, Nullability::NonNullable),
249        )
250        .unwrap()
251    }
252
253    #[test]
254    fn test_scalar_subtract() {
255        let chunked = chunked_array().into_array();
256        let to_subtract = 1u64;
257        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
258
259        let chunked = array.as_::<ChunkedVTable>();
260        let chunks_out = chunked.chunks();
261
262        let results = chunks_out[0].to_primitive().as_slice::<u64>().to_vec();
263        assert_eq!(results, &[0u64, 1, 2]);
264        let results = chunks_out[1].to_primitive().as_slice::<u64>().to_vec();
265        assert_eq!(results, &[3u64, 4, 5]);
266        let results = chunks_out[2].to_primitive().as_slice::<u64>().to_vec();
267        assert_eq!(results, &[6u64, 7, 8]);
268    }
269
270    #[test]
271    fn test_rechunk_one_chunk() {
272        let chunked = ChunkedArray::try_new(
273            vec![buffer![0u64].into_array()],
274            DType::Primitive(PType::U64, Nullability::NonNullable),
275        )
276        .unwrap();
277
278        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
279
280        assert_arrays_eq!(chunked, rechunked);
281    }
282
283    #[test]
284    fn test_rechunk_two_chunks() {
285        let chunked = ChunkedArray::try_new(
286            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
287            DType::Primitive(PType::U64, Nullability::NonNullable),
288        )
289        .unwrap();
290
291        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
292
293        assert_eq!(rechunked.nchunks(), 1);
294        assert_arrays_eq!(chunked, rechunked);
295    }
296
297    #[test]
298    fn test_rechunk_tiny_target_chunks() {
299        let chunked = ChunkedArray::try_new(
300            vec![
301                buffer![0u64, 1, 2, 3].into_array(),
302                buffer![4u64, 5].into_array(),
303            ],
304            DType::Primitive(PType::U64, Nullability::NonNullable),
305        )
306        .unwrap();
307
308        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
309
310        assert_eq!(rechunked.nchunks(), 2);
311        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
312        assert_arrays_eq!(chunked, rechunked);
313    }
314
315    #[test]
316    fn test_rechunk_with_too_big_chunk() {
317        let chunked = ChunkedArray::try_new(
318            vec![
319                buffer![0u64, 1, 2].into_array(),
320                buffer![42_u64; 6].into_array(),
321                buffer![4u64, 5].into_array(),
322                buffer![6u64, 7].into_array(),
323                buffer![8u64, 9].into_array(),
324            ],
325            DType::Primitive(PType::U64, Nullability::NonNullable),
326        )
327        .unwrap();
328
329        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
330        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
331
332        assert_eq!(rechunked.nchunks(), 4);
333        assert_arrays_eq!(chunked, rechunked);
334    }
335
336    #[test]
337    fn test_empty_chunks_all_valid() -> VortexResult<()> {
338        // Create chunks where some are empty but all non-empty chunks have all valid values
339        let chunks = vec![
340            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
341            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
342            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
343            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
344        ];
345
346        let chunked =
347            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
348
349        // Should be all_valid since all non-empty chunks are all_valid
350        assert!(chunked.all_valid());
351        assert!(!chunked.all_invalid());
352
353        Ok(())
354    }
355
356    #[test]
357    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
358        // Create chunks where some are empty but all non-empty chunks have all invalid values
359        let chunks = vec![
360            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
361            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
362            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
363            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
364        ];
365
366        let chunked =
367            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
368
369        // Should be all_invalid since all non-empty chunks are all_invalid
370        assert!(!chunked.all_valid());
371        assert!(chunked.all_invalid());
372
373        Ok(())
374    }
375
376    #[test]
377    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
378        // Create chunks with mixed validity including empty chunks
379        let chunks = vec![
380            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
381            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
382            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
383            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
384        ];
385
386        let chunked =
387            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
388
389        // Should be neither all_valid nor all_invalid
390        assert!(!chunked.all_valid());
391        assert!(!chunked.all_invalid());
392
393        Ok(())
394    }
395}