Skip to main content

vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9use std::fmt::Display;
10use std::fmt::Formatter;
11
12use futures::stream;
13use vortex_buffer::BufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17
18use crate::ArrayRef;
19use crate::IntoArray;
20use crate::array::Array;
21use crate::array::ArrayParts;
22use crate::array::TypedArrayRef;
23use crate::arrays::Chunked;
24use crate::arrays::PrimitiveArray;
25use crate::dtype::DType;
26use crate::iter::ArrayIterator;
27use crate::iter::ArrayIteratorAdapter;
28use crate::search_sorted::SearchSorted;
29use crate::search_sorted::SearchSortedSide;
30use crate::stream::ArrayStream;
31use crate::stream::ArrayStreamAdapter;
32use crate::validity::Validity;
33
34pub(super) const CHUNK_OFFSETS_SLOT: usize = 0;
35pub(super) const CHUNKS_OFFSET: usize = 1;
36
37#[derive(Clone, Debug)]
38pub struct ChunkedData {
39    pub(super) chunk_offsets: Vec<usize>,
40}
41
42impl Display for ChunkedData {
43    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44        write!(f, "nchunks: {}", self.chunk_offsets.len().saturating_sub(1))
45    }
46}
47
48pub trait ChunkedArrayExt: TypedArrayRef<Chunked> {
49    fn chunk_offsets_array(&self) -> &ArrayRef {
50        self.as_ref().slots()[CHUNK_OFFSETS_SLOT]
51            .as_ref()
52            .vortex_expect("validated chunk offsets slot")
53    }
54
55    fn nchunks(&self) -> usize {
56        self.as_ref().slots().len().saturating_sub(CHUNKS_OFFSET)
57    }
58
59    fn chunk(&self, idx: usize) -> &ArrayRef {
60        self.as_ref().slots()[CHUNKS_OFFSET + idx]
61            .as_ref()
62            .vortex_expect("validated chunk slot")
63    }
64
65    fn iter_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
66        Box::new(
67            self.as_ref().slots()[CHUNKS_OFFSET..]
68                .iter()
69                .map(|slot| slot.as_ref().vortex_expect("validated chunk slot")),
70        )
71    }
72
73    fn chunks(&self) -> Vec<ArrayRef> {
74        self.iter_chunks().cloned().collect()
75    }
76
77    fn non_empty_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
78        Box::new(self.iter_chunks().filter(|chunk| !chunk.is_empty()))
79    }
80
81    fn chunk_offsets(&self) -> &[usize] {
82        &self.chunk_offsets
83    }
84
85    fn find_chunk_idx(&self, index: usize) -> VortexResult<(usize, usize)> {
86        assert!(
87            index <= self.as_ref().len(),
88            "Index out of bounds of the array"
89        );
90        let chunk_offsets = self.chunk_offsets();
91        let index_chunk = chunk_offsets
92            .search_sorted(&index, SearchSortedSide::Right)?
93            .to_ends_index(self.nchunks() + 1)
94            .saturating_sub(1);
95        let chunk_start = chunk_offsets[index_chunk];
96        let index_in_chunk = index - chunk_start;
97        Ok((index_chunk, index_in_chunk))
98    }
99
100    fn array_iterator(&self) -> impl ArrayIterator + '_ {
101        ArrayIteratorAdapter::new(
102            self.as_ref().dtype().clone(),
103            self.iter_chunks().map(|chunk| Ok(chunk.clone())),
104        )
105    }
106
107    fn array_stream(&self) -> impl ArrayStream + '_ {
108        ArrayStreamAdapter::new(
109            self.as_ref().dtype().clone(),
110            stream::iter(self.iter_chunks().map(|chunk| Ok(chunk.clone()))),
111        )
112    }
113}
114impl<T: TypedArrayRef<Chunked>> ChunkedArrayExt for T {}
115
116impl ChunkedData {
117    pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec<usize> {
118        let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1);
119        chunk_offsets.push(0);
120        let mut curr_offset = 0;
121        for chunk in chunks {
122            curr_offset += chunk.len();
123            chunk_offsets.push(curr_offset);
124        }
125        chunk_offsets
126    }
127
128    pub(super) fn make_slots(
129        chunk_offsets: &[usize],
130        chunks: &[ArrayRef],
131    ) -> Vec<Option<ArrayRef>> {
132        let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(chunk_offsets.len());
133        for &offset in chunk_offsets {
134            let offset = u64::try_from(offset)
135                .vortex_expect("chunk offset must fit in u64 for serialization");
136            unsafe { chunk_offsets_buf.push_unchecked(offset) }
137        }
138
139        let mut slots = Vec::with_capacity(1 + chunks.len());
140        slots.push(Some(
141            PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable).into_array(),
142        ));
143        slots.extend(chunks.iter().map(|c| Some(c.clone())));
144        slots
145    }
146
147    /// Validates the components that would be used to create a `ChunkedArray`.
148    ///
149    /// This function checks all the invariants required by `ChunkedArray::new_unchecked`.
150    pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
151        for chunk in chunks {
152            if chunk.dtype() != dtype {
153                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
154            }
155        }
156
157        Ok(())
158    }
159}
160
161impl Array<Chunked> {
162    /// Constructs a new `ChunkedArray`.
163    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
164        ChunkedData::validate(&chunks, &dtype)?;
165        let len = chunks.iter().map(|chunk| chunk.len()).sum();
166        let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
167        Ok(unsafe {
168            Array::from_parts_unchecked(
169                ArrayParts::new(
170                    Chunked,
171                    dtype,
172                    len,
173                    ChunkedData {
174                        chunk_offsets: chunk_offsets.clone(),
175                    },
176                )
177                .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
178            )
179        })
180    }
181
182    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
183        let mut new_chunks = Vec::new();
184        let mut chunks_to_combine = Vec::new();
185        let mut new_chunk_n_bytes = 0;
186        let mut new_chunk_n_elements = 0;
187        for chunk in self.iter_chunks() {
188            let n_bytes = chunk.nbytes();
189            let n_elements = chunk.len();
190
191            if (new_chunk_n_bytes + n_bytes > target_bytesize
192                || new_chunk_n_elements + n_elements > target_rowsize)
193                && !chunks_to_combine.is_empty()
194            {
195                new_chunks.push(
196                    unsafe {
197                        Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone())
198                    }
199                    .into_array()
200                    .to_canonical()?
201                    .into_array(),
202                );
203
204                new_chunk_n_bytes = 0;
205                new_chunk_n_elements = 0;
206                chunks_to_combine = Vec::new();
207            }
208
209            if n_bytes > target_bytesize || n_elements > target_rowsize {
210                new_chunks.push(chunk.clone());
211            } else {
212                new_chunk_n_bytes += n_bytes;
213                new_chunk_n_elements += n_elements;
214                chunks_to_combine.push(chunk.clone());
215            }
216        }
217
218        if !chunks_to_combine.is_empty() {
219            new_chunks.push(
220                unsafe { Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone()) }
221                    .into_array()
222                    .to_canonical()?
223                    .into_array(),
224            );
225        }
226
227        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
228    }
229
230    /// Creates a new `ChunkedArray` without validation.
231    ///
232    /// # Safety
233    ///
234    /// All chunks must have exactly the same [`DType`] as the provided `dtype`.
235    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
236        let len = chunks.iter().map(|chunk| chunk.len()).sum();
237        let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
238        unsafe {
239            Array::from_parts_unchecked(
240                ArrayParts::new(
241                    Chunked,
242                    dtype,
243                    len,
244                    ChunkedData {
245                        chunk_offsets: chunk_offsets.clone(),
246                    },
247                )
248                .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
249            )
250        }
251    }
252}
253
254impl FromIterator<ArrayRef> for Array<Chunked> {
255    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
256        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
257        let dtype = chunks
258            .first()
259            .map(|c| c.dtype().clone())
260            .vortex_expect("Cannot infer DType from an empty iterator");
261        Array::<Chunked>::try_new(chunks, dtype)
262            .vortex_expect("Failed to create chunked array from iterator")
263    }
264}
265
266#[cfg(test)]
267mod test {
268    use vortex_buffer::buffer;
269    use vortex_error::VortexResult;
270
271    use crate::IntoArray;
272    use crate::LEGACY_SESSION;
273    use crate::VortexSessionExecute;
274    use crate::arrays::ChunkedArray;
275    use crate::arrays::PrimitiveArray;
276    use crate::arrays::chunked::ChunkedArrayExt;
277    use crate::assert_arrays_eq;
278    use crate::dtype::DType;
279    use crate::dtype::Nullability;
280    use crate::dtype::PType;
281    use crate::validity::Validity;
282
283    #[test]
284    fn test_rechunk_one_chunk() {
285        let chunked = ChunkedArray::try_new(
286            vec![buffer![0u64].into_array()],
287            DType::Primitive(PType::U64, Nullability::NonNullable),
288        )
289        .unwrap();
290
291        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
292
293        assert_arrays_eq!(chunked, rechunked);
294    }
295
296    #[test]
297    fn test_rechunk_two_chunks() {
298        let chunked = ChunkedArray::try_new(
299            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
300            DType::Primitive(PType::U64, Nullability::NonNullable),
301        )
302        .unwrap();
303
304        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
305
306        assert_eq!(rechunked.nchunks(), 1);
307        assert_arrays_eq!(chunked, rechunked);
308    }
309
310    #[test]
311    fn test_rechunk_tiny_target_chunks() {
312        let chunked = ChunkedArray::try_new(
313            vec![
314                buffer![0u64, 1, 2, 3].into_array(),
315                buffer![4u64, 5].into_array(),
316            ],
317            DType::Primitive(PType::U64, Nullability::NonNullable),
318        )
319        .unwrap();
320
321        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
322
323        assert_eq!(rechunked.nchunks(), 2);
324        assert!(rechunked.iter_chunks().all(|c| c.len() < 5));
325        assert_arrays_eq!(chunked, rechunked);
326    }
327
328    #[test]
329    fn test_rechunk_with_too_big_chunk() {
330        let chunked = ChunkedArray::try_new(
331            vec![
332                buffer![0u64, 1, 2].into_array(),
333                buffer![42_u64; 6].into_array(),
334                buffer![4u64, 5].into_array(),
335                buffer![6u64, 7].into_array(),
336                buffer![8u64, 9].into_array(),
337            ],
338            DType::Primitive(PType::U64, Nullability::NonNullable),
339        )
340        .unwrap();
341
342        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
343        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
344
345        assert_eq!(rechunked.nchunks(), 4);
346        assert_arrays_eq!(chunked, rechunked);
347    }
348
349    #[test]
350    fn test_empty_chunks_all_valid() -> VortexResult<()> {
351        // Create chunks where some are empty but all non-empty chunks have all valid values
352        let chunks = vec![
353            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
354            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
355            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
356            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
357        ];
358
359        let chunked =
360            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
361
362        // Should be all_valid since all non-empty chunks are all_valid
363        assert!(
364            chunked
365                .all_valid(&mut LEGACY_SESSION.create_execution_ctx())
366                .unwrap()
367        );
368        assert!(
369            !chunked
370                .into_array()
371                .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())
372                .unwrap()
373        );
374
375        Ok(())
376    }
377
378    #[test]
379    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
380        // Create chunks where some are empty but all non-empty chunks have all invalid values
381        let chunks = vec![
382            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
383            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
384            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
385            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
386        ];
387
388        let chunked =
389            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
390
391        // Should be all_invalid since all non-empty chunks are all_invalid
392        assert!(
393            !chunked
394                .all_valid(&mut LEGACY_SESSION.create_execution_ctx())
395                .unwrap()
396        );
397        assert!(
398            chunked
399                .into_array()
400                .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())
401                .unwrap()
402        );
403
404        Ok(())
405    }
406
407    #[test]
408    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
409        // Create chunks with mixed validity including empty chunks
410        let chunks = vec![
411            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
412            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
413            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
414            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
415        ];
416
417        let chunked =
418            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
419
420        // Should be neither all_valid nor all_invalid
421        assert!(
422            !chunked
423                .all_valid(&mut LEGACY_SESSION.create_execution_ctx())
424                .unwrap()
425        );
426        assert!(
427            !chunked
428                .into_array()
429                .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())
430                .unwrap()
431        );
432
433        Ok(())
434    }
435}