vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9
10use futures::stream;
11use vortex_buffer::{Buffer, BufferMut};
12use vortex_dtype::DType;
13use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
14
15use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
16use crate::search_sorted::{SearchSorted, SearchSortedSide};
17use crate::stats::ArrayStats;
18use crate::stream::{ArrayStream, ArrayStreamAdapter};
19use crate::{Array, ArrayRef, IntoArray};
20
21#[derive(Clone, Debug)]
22pub struct ChunkedArray {
23    pub(super) dtype: DType,
24    pub(super) len: usize,
25    pub(super) chunk_offsets: Buffer<u64>,
26    pub(super) chunks: Vec<ArrayRef>,
27    pub(super) stats_set: ArrayStats,
28}
29
30impl ChunkedArray {
31    /// Constructs a new `ChunkedArray`.
32    ///
33    /// See [`ChunkedArray::new_unchecked`] for more information.
34    ///
35    /// # Errors
36    ///
37    /// Returns an error if the provided components do not satisfy the invariants documented in
38    /// [`ChunkedArray::new_unchecked`].
39    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
40        Self::validate(&chunks, &dtype)?;
41
42        // SAFETY: validation done above.
43        unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
44    }
45
46    /// Creates a new [`ChunkedArray`] without validation from these components:
47    ///
48    /// * `chunks` is a vector of arrays to be concatenated logically.
49    /// * `dtype` is the common data type of all chunks.
50    ///
51    /// # Safety
52    ///
53    /// All chunks must have exactly the same [`DType`] as the provided `dtype`.
54    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
55        #[cfg(debug_assertions)]
56        Self::validate(&chunks, &dtype)
57            .vortex_expect("[Debug Assertion]: Invalid `ChunkedArray` parameters");
58
59        let nchunks = chunks.len();
60
61        let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
62        // SAFETY: nchunks + 1
63        unsafe { chunk_offsets.push_unchecked(0) }
64        let mut curr_offset = 0;
65        for c in &chunks {
66            curr_offset += c.len() as u64;
67            // SAFETY: nchunks + 1
68            unsafe { chunk_offsets.push_unchecked(curr_offset) }
69        }
70
71        Self {
72            dtype,
73            len: curr_offset.try_into().vortex_unwrap(),
74            chunk_offsets: chunk_offsets.freeze(),
75            chunks,
76            stats_set: Default::default(),
77        }
78    }
79
80    /// Validates the components that would be used to create a [`ChunkedArray`].
81    ///
82    /// This function checks all the invariants required by [`ChunkedArray::new_unchecked`].
83    pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
84        for chunk in chunks {
85            if chunk.dtype() != dtype {
86                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
87            }
88        }
89
90        Ok(())
91    }
92
93    #[inline]
94    pub fn chunk(&self, idx: usize) -> &ArrayRef {
95        assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
96
97        &self.chunks[idx]
98    }
99
100    pub fn nchunks(&self) -> usize {
101        self.chunks.len()
102    }
103
104    #[inline]
105    pub fn chunk_offsets(&self) -> &Buffer<u64> {
106        &self.chunk_offsets
107    }
108
109    pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
110        assert!(index <= self.len(), "Index out of bounds of the array");
111        let index = index as u64;
112
113        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
114        // and take the last chunk (we subtract 1 since there's a leading 0)
115        let index_chunk = self
116            .chunk_offsets()
117            .search_sorted(&index, SearchSortedSide::Right)
118            .to_ends_index(self.nchunks() + 1)
119            .saturating_sub(1);
120        let chunk_start = self.chunk_offsets()[index_chunk];
121
122        let index_in_chunk =
123            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
124        (index_chunk, index_in_chunk)
125    }
126
127    pub fn chunks(&self) -> &[ArrayRef] {
128        &self.chunks
129    }
130
131    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
132        self.chunks().iter().filter(|c| !c.is_empty())
133    }
134
135    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
136        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
137    }
138
139    pub fn array_stream(&self) -> impl ArrayStream + '_ {
140        ArrayStreamAdapter::new(
141            self.dtype().clone(),
142            stream::iter(self.chunks().iter().cloned().map(Ok)),
143        )
144    }
145
146    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
147        let mut new_chunks = Vec::new();
148        let mut chunks_to_combine = Vec::new();
149        let mut new_chunk_n_bytes = 0;
150        let mut new_chunk_n_elements = 0;
151        for chunk in self.chunks() {
152            let n_bytes = chunk.nbytes();
153            let n_elements = chunk.len();
154
155            if (new_chunk_n_bytes + n_bytes > target_bytesize
156                || new_chunk_n_elements + n_elements > target_rowsize)
157                && !chunks_to_combine.is_empty()
158            {
159                new_chunks.push(
160                    // SAFETY: chunks_to_combine contains valid chunks of the same dtype as self.
161                    // All chunks are guaranteed to be valid arrays matching self.dtype().
162                    unsafe {
163                        ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
164                            .to_canonical()
165                            .into_array()
166                    },
167                );
168
169                new_chunk_n_bytes = 0;
170                new_chunk_n_elements = 0;
171                chunks_to_combine = Vec::new();
172            }
173
174            if n_bytes > target_bytesize || n_elements > target_rowsize {
175                new_chunks.push(chunk.clone());
176            } else {
177                new_chunk_n_bytes += n_bytes;
178                new_chunk_n_elements += n_elements;
179                chunks_to_combine.push(chunk.clone());
180            }
181        }
182
183        if !chunks_to_combine.is_empty() {
184            new_chunks.push(unsafe {
185                // SAFETY: chunks_to_combine contains valid chunks of the same dtype as self.
186                // All chunks are guaranteed to be valid arrays matching self.dtype().
187                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
188                    .to_canonical()
189                    .into_array()
190            });
191        }
192
193        // SAFETY: new_chunks contains valid arrays of the same dtype as self.
194        // All chunks were either taken from self or created from self's chunks.
195        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
196    }
197}
198
199impl FromIterator<ArrayRef> for ChunkedArray {
200    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
201        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
202        let dtype = chunks
203            .first()
204            .map(|c| c.dtype().clone())
205            .vortex_expect("Cannot infer DType from an empty iterator");
206        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
207    }
208}
209
210#[cfg(test)]
211mod test {
212    use vortex_buffer::buffer;
213    use vortex_dtype::{DType, Nullability, PType};
214    use vortex_error::VortexResult;
215
216    use crate::array::Array;
217    use crate::arrays::chunked::ChunkedArray;
218    use crate::arrays::{ChunkedVTable, PrimitiveArray};
219    use crate::compute::sub_scalar;
220    use crate::validity::Validity;
221    use crate::{IntoArray, ToCanonical, assert_arrays_eq};
222
223    fn chunked_array() -> ChunkedArray {
224        ChunkedArray::try_new(
225            vec![
226                buffer![1u64, 2, 3].into_array(),
227                buffer![4u64, 5, 6].into_array(),
228                buffer![7u64, 8, 9].into_array(),
229            ],
230            DType::Primitive(PType::U64, Nullability::NonNullable),
231        )
232        .unwrap()
233    }
234
235    #[test]
236    fn test_scalar_subtract() {
237        let chunked = chunked_array().into_array();
238        let to_subtract = 1u64;
239        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
240
241        let chunked = array.as_::<ChunkedVTable>();
242        let chunks_out = chunked.chunks();
243
244        let results = chunks_out[0].to_primitive().as_slice::<u64>().to_vec();
245        assert_eq!(results, &[0u64, 1, 2]);
246        let results = chunks_out[1].to_primitive().as_slice::<u64>().to_vec();
247        assert_eq!(results, &[3u64, 4, 5]);
248        let results = chunks_out[2].to_primitive().as_slice::<u64>().to_vec();
249        assert_eq!(results, &[6u64, 7, 8]);
250    }
251
252    #[test]
253    fn test_rechunk_one_chunk() {
254        let chunked = ChunkedArray::try_new(
255            vec![buffer![0u64].into_array()],
256            DType::Primitive(PType::U64, Nullability::NonNullable),
257        )
258        .unwrap();
259
260        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
261
262        assert_arrays_eq!(chunked, rechunked);
263    }
264
265    #[test]
266    fn test_rechunk_two_chunks() {
267        let chunked = ChunkedArray::try_new(
268            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
269            DType::Primitive(PType::U64, Nullability::NonNullable),
270        )
271        .unwrap();
272
273        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
274
275        assert_eq!(rechunked.nchunks(), 1);
276        assert_arrays_eq!(chunked, rechunked);
277    }
278
279    #[test]
280    fn test_rechunk_tiny_target_chunks() {
281        let chunked = ChunkedArray::try_new(
282            vec![
283                buffer![0u64, 1, 2, 3].into_array(),
284                buffer![4u64, 5].into_array(),
285            ],
286            DType::Primitive(PType::U64, Nullability::NonNullable),
287        )
288        .unwrap();
289
290        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
291
292        assert_eq!(rechunked.nchunks(), 2);
293        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
294        assert_arrays_eq!(chunked, rechunked);
295    }
296
297    #[test]
298    fn test_rechunk_with_too_big_chunk() {
299        let chunked = ChunkedArray::try_new(
300            vec![
301                buffer![0u64, 1, 2].into_array(),
302                buffer![42_u64; 6].into_array(),
303                buffer![4u64, 5].into_array(),
304                buffer![6u64, 7].into_array(),
305                buffer![8u64, 9].into_array(),
306            ],
307            DType::Primitive(PType::U64, Nullability::NonNullable),
308        )
309        .unwrap();
310
311        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
312        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
313
314        assert_eq!(rechunked.nchunks(), 4);
315        assert_arrays_eq!(chunked, rechunked);
316    }
317
318    #[test]
319    fn test_empty_chunks_all_valid() -> VortexResult<()> {
320        // Create chunks where some are empty but all non-empty chunks have all valid values
321        let chunks = vec![
322            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
323            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
324            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
325            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
326        ];
327
328        let chunked =
329            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
330
331        // Should be all_valid since all non-empty chunks are all_valid
332        assert!(chunked.all_valid());
333        assert!(!chunked.all_invalid());
334
335        Ok(())
336    }
337
338    #[test]
339    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
340        // Create chunks where some are empty but all non-empty chunks have all invalid values
341        let chunks = vec![
342            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
343            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
344            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
345            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
346        ];
347
348        let chunked =
349            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
350
351        // Should be all_invalid since all non-empty chunks are all_invalid
352        assert!(!chunked.all_valid());
353        assert!(chunked.all_invalid());
354
355        Ok(())
356    }
357
358    #[test]
359    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
360        // Create chunks with mixed validity including empty chunks
361        let chunks = vec![
362            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
363            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
364            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
365            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
366        ];
367
368        let chunked =
369            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
370
371        // Should be neither all_valid nor all_invalid
372        assert!(!chunked.all_valid());
373        assert!(!chunked.all_invalid());
374
375        Ok(())
376    }
377}