Skip to main content

vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9use std::fmt::Display;
10use std::fmt::Formatter;
11
12use futures::stream;
13use smallvec::SmallVec;
14use vortex_buffer::BufferMut;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_error::vortex_bail;
18
19use crate::ArrayRef;
20use crate::ArraySlots;
21use crate::IntoArray;
22use crate::array::Array;
23use crate::array::ArrayParts;
24use crate::array::TypedArrayRef;
25use crate::arrays::Chunked;
26use crate::arrays::PrimitiveArray;
27use crate::dtype::DType;
28use crate::iter::ArrayIterator;
29use crate::iter::ArrayIteratorAdapter;
30use crate::search_sorted::SearchSorted;
31use crate::search_sorted::SearchSortedSide;
32use crate::stream::ArrayStream;
33use crate::stream::ArrayStreamAdapter;
34use crate::validity::Validity;
35
36pub(super) const CHUNK_OFFSETS_SLOT: usize = 0;
37pub(super) const CHUNKS_OFFSET: usize = 1;
38
39#[derive(Clone, Debug)]
40pub struct ChunkedData {
41    pub(super) chunk_offsets: Vec<usize>,
42    /// This is used to find the next child to execute when in executing into a builder.
43    pub(super) next_builder_slot: usize,
44}
45
46impl Display for ChunkedData {
47    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48        write!(f, "nchunks: {}", self.chunk_offsets.len().saturating_sub(1))
49    }
50}
51
52pub trait ChunkedArrayExt: TypedArrayRef<Chunked> {
53    fn chunk_offsets_array(&self) -> &ArrayRef {
54        self.as_ref().slots()[CHUNK_OFFSETS_SLOT]
55            .as_ref()
56            .vortex_expect("validated chunk offsets slot")
57    }
58
59    fn nchunks(&self) -> usize {
60        self.as_ref().slots().len().saturating_sub(CHUNKS_OFFSET)
61    }
62
63    fn chunk(&self, idx: usize) -> &ArrayRef {
64        self.as_ref().slots()[CHUNKS_OFFSET + idx]
65            .as_ref()
66            .vortex_expect("validated chunk slot")
67    }
68
69    fn iter_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
70        Box::new(
71            self.as_ref().slots()[CHUNKS_OFFSET..]
72                .iter()
73                .map(|slot| slot.as_ref().vortex_expect("validated chunk slot")),
74        )
75    }
76
77    fn chunks(&self) -> Vec<ArrayRef> {
78        self.iter_chunks().cloned().collect()
79    }
80
81    fn non_empty_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
82        Box::new(self.iter_chunks().filter(|chunk| !chunk.is_empty()))
83    }
84
85    fn chunk_offsets(&self) -> &[usize] {
86        &self.chunk_offsets
87    }
88
89    fn find_chunk_idx(&self, index: usize) -> VortexResult<(usize, usize)> {
90        assert!(
91            index <= self.as_ref().len(),
92            "Index out of bounds of the array"
93        );
94        let chunk_offsets = self.chunk_offsets();
95        let index_chunk = chunk_offsets
96            .search_sorted(&index, SearchSortedSide::Right)?
97            .to_ends_index(self.nchunks() + 1)
98            .saturating_sub(1);
99        let chunk_start = chunk_offsets[index_chunk];
100        let index_in_chunk = index - chunk_start;
101        Ok((index_chunk, index_in_chunk))
102    }
103
104    fn array_iterator(&self) -> impl ArrayIterator + '_ {
105        ArrayIteratorAdapter::new(
106            self.as_ref().dtype().clone(),
107            self.iter_chunks().map(|chunk| Ok(chunk.clone())),
108        )
109    }
110
111    fn array_stream(&self) -> impl ArrayStream + '_ {
112        ArrayStreamAdapter::new(
113            self.as_ref().dtype().clone(),
114            stream::iter(self.iter_chunks().map(|chunk| Ok(chunk.clone()))),
115        )
116    }
117}
118impl<T: TypedArrayRef<Chunked>> ChunkedArrayExt for T {}
119
120impl ChunkedData {
121    pub(super) fn new(chunk_offsets: Vec<usize>) -> Self {
122        Self {
123            chunk_offsets,
124            next_builder_slot: CHUNKS_OFFSET,
125        }
126    }
127
128    pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec<usize> {
129        let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1);
130        chunk_offsets.push(0);
131        let mut curr_offset = 0;
132        for chunk in chunks {
133            curr_offset += chunk.len();
134            chunk_offsets.push(curr_offset);
135        }
136        chunk_offsets
137    }
138
139    pub(super) fn make_slots(chunk_offsets: &[usize], chunks: &[ArrayRef]) -> ArraySlots {
140        let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(chunk_offsets.len());
141        for &offset in chunk_offsets {
142            let offset = u64::try_from(offset)
143                .vortex_expect("chunk offset must fit in u64 for serialization");
144            unsafe { chunk_offsets_buf.push_unchecked(offset) }
145        }
146
147        let mut slots = SmallVec::with_capacity(1 + chunks.len());
148        slots.push(Some(
149            PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable).into_array(),
150        ));
151        slots.extend(chunks.iter().map(|c| Some(c.clone())));
152        slots
153    }
154
155    /// Validates the components that would be used to create a `ChunkedArray`.
156    ///
157    /// This function checks all the invariants required by `ChunkedArray::new_unchecked`.
158    pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
159        for chunk in chunks {
160            if chunk.dtype() != dtype {
161                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
162            }
163        }
164
165        Ok(())
166    }
167}
168
169impl Array<Chunked> {
170    pub(super) fn with_next_builder_slot(mut self, next_builder_slot: usize) -> Self {
171        if let Some(data) = self.data_mut() {
172            data.next_builder_slot = next_builder_slot;
173            return self;
174        }
175        // This is the slow path that will be hit at most once per execution since the second one
176        // *MUST* have execlusive access due to this copy.
177        let stats = self.statistics().to_owned();
178        let mut data = self.data().clone();
179        data.next_builder_slot = next_builder_slot;
180        // SAFETY: we only modified next_builder_slot which doesn't affect array invariants.
181        unsafe {
182            Array::from_parts_unchecked(
183                ArrayParts::new(Chunked, self.dtype().clone(), self.len(), data)
184                    .with_slots(self.slots().iter().cloned().collect::<ArraySlots>()),
185            )
186        }
187        .with_stats_set(stats)
188    }
189
190    /// Constructs a new `ChunkedArray`.
191    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
192        ChunkedData::validate(&chunks, &dtype)?;
193        // SAFETY just validated on previous line.
194        Ok(unsafe { Self::new_unchecked(chunks, dtype) })
195    }
196
197    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
198        let mut new_chunks = Vec::new();
199        let mut chunks_to_combine = Vec::new();
200        let mut new_chunk_n_bytes = 0;
201        let mut new_chunk_n_elements = 0;
202        for chunk in self.iter_chunks() {
203            let n_bytes = chunk.nbytes();
204            let n_elements = chunk.len();
205
206            if (new_chunk_n_bytes + n_bytes > target_bytesize
207                || new_chunk_n_elements + n_elements > target_rowsize)
208                && !chunks_to_combine.is_empty()
209            {
210                #[expect(deprecated)]
211                let canonical = unsafe {
212                    Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone())
213                }
214                .into_array()
215                .to_canonical()?
216                .into_array();
217                new_chunks.push(canonical);
218
219                new_chunk_n_bytes = 0;
220                new_chunk_n_elements = 0;
221                chunks_to_combine = Vec::new();
222            }
223
224            if n_bytes > target_bytesize || n_elements > target_rowsize {
225                new_chunks.push(chunk.clone());
226            } else {
227                new_chunk_n_bytes += n_bytes;
228                new_chunk_n_elements += n_elements;
229                chunks_to_combine.push(chunk.clone());
230            }
231        }
232
233        if !chunks_to_combine.is_empty() {
234            #[expect(deprecated)]
235            let canonical =
236                unsafe { Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone()) }
237                    .into_array()
238                    .to_canonical()?
239                    .into_array();
240            new_chunks.push(canonical);
241        }
242
243        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
244    }
245
246    /// Creates a new `ChunkedArray` without validation.
247    ///
248    /// # Safety
249    ///
250    /// All chunks must have exactly the same [`DType`] as the provided `dtype`.
251    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
252        let len = chunks.iter().map(|chunk| chunk.len()).sum();
253        let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
254        unsafe {
255            Array::from_parts_unchecked(
256                ArrayParts::new(Chunked, dtype, len, ChunkedData::new(chunk_offsets.clone()))
257                    .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
258            )
259        }
260    }
261}
262
263impl FromIterator<ArrayRef> for Array<Chunked> {
264    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
265        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
266        let dtype = chunks
267            .first()
268            .map(|c| c.dtype().clone())
269            .vortex_expect("Cannot infer DType from an empty iterator");
270        Array::<Chunked>::try_new(chunks, dtype)
271            .vortex_expect("Failed to create chunked array from iterator")
272    }
273}
274
275#[cfg(test)]
276mod test {
277    use vortex_buffer::buffer;
278    use vortex_error::VortexResult;
279
280    use crate::IntoArray;
281    use crate::VortexSessionExecute;
282    use crate::array_session;
283    use crate::arrays::ChunkedArray;
284    use crate::arrays::PrimitiveArray;
285    use crate::arrays::chunked::ChunkedArrayExt;
286    use crate::assert_arrays_eq;
287    use crate::dtype::DType;
288    use crate::dtype::Nullability;
289    use crate::dtype::PType;
290    use crate::validity::Validity;
291
292    #[test]
293    fn test_rechunk_one_chunk() {
294        let mut ctx = array_session().create_execution_ctx();
295        let chunked = ChunkedArray::try_new(
296            vec![buffer![0u64].into_array()],
297            DType::Primitive(PType::U64, Nullability::NonNullable),
298        )
299        .unwrap();
300
301        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
302
303        assert_arrays_eq!(chunked, rechunked, &mut ctx);
304    }
305
306    #[test]
307    fn test_rechunk_two_chunks() {
308        let mut ctx = array_session().create_execution_ctx();
309        let chunked = ChunkedArray::try_new(
310            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
311            DType::Primitive(PType::U64, Nullability::NonNullable),
312        )
313        .unwrap();
314
315        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
316
317        assert_eq!(rechunked.nchunks(), 1);
318        assert_arrays_eq!(chunked, rechunked, &mut ctx);
319    }
320
321    #[test]
322    fn test_rechunk_tiny_target_chunks() {
323        let mut ctx = array_session().create_execution_ctx();
324        let chunked = ChunkedArray::try_new(
325            vec![
326                buffer![0u64, 1, 2, 3].into_array(),
327                buffer![4u64, 5].into_array(),
328            ],
329            DType::Primitive(PType::U64, Nullability::NonNullable),
330        )
331        .unwrap();
332
333        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
334
335        assert_eq!(rechunked.nchunks(), 2);
336        assert!(rechunked.iter_chunks().all(|c| c.len() < 5));
337        assert_arrays_eq!(chunked, rechunked, &mut ctx);
338    }
339
340    #[test]
341    fn test_rechunk_with_too_big_chunk() {
342        let mut ctx = array_session().create_execution_ctx();
343        let chunked = ChunkedArray::try_new(
344            vec![
345                buffer![0u64, 1, 2].into_array(),
346                buffer![42_u64; 6].into_array(),
347                buffer![4u64, 5].into_array(),
348                buffer![6u64, 7].into_array(),
349                buffer![8u64, 9].into_array(),
350            ],
351            DType::Primitive(PType::U64, Nullability::NonNullable),
352        )
353        .unwrap();
354
355        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
356        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
357
358        assert_eq!(rechunked.nchunks(), 4);
359        assert_arrays_eq!(chunked, rechunked, &mut ctx);
360    }
361
362    #[test]
363    fn test_empty_chunks_all_valid() -> VortexResult<()> {
364        // Create chunks where some are empty but all non-empty chunks have all valid values
365        let chunks = vec![
366            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
367            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
368            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
369            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
370        ];
371
372        let chunked =
373            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
374
375        // Should be all_valid since all non-empty chunks are all_valid
376        assert!(chunked.all_valid(&mut array_session().create_execution_ctx())?);
377        assert!(
378            !chunked
379                .into_array()
380                .all_invalid(&mut array_session().create_execution_ctx())?
381        );
382
383        Ok(())
384    }
385
386    #[test]
387    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
388        // Create chunks where some are empty but all non-empty chunks have all invalid values
389        let chunks = vec![
390            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
391            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), /* empty chunk */
392            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
393            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), /* empty chunk */
394        ];
395
396        let chunked =
397            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
398
399        // Should be all_invalid since all non-empty chunks are all_invalid
400        assert!(!chunked.all_valid(&mut array_session().create_execution_ctx())?);
401        assert!(
402            chunked
403                .into_array()
404                .all_invalid(&mut array_session().create_execution_ctx())?
405        );
406
407        Ok(())
408    }
409
410    #[test]
411    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
412        // Create chunks with mixed validity including empty chunks
413        let chunks = vec![
414            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
415            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
416            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
417            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), /* empty chunk */
418        ];
419
420        let chunked =
421            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
422
423        // Should be neither all_valid nor all_invalid
424        assert!(!chunked.all_valid(&mut array_session().create_execution_ctx())?);
425        assert!(
426            !chunked
427                .into_array()
428                .all_invalid(&mut array_session().create_execution_ctx())?
429        );
430
431        Ok(())
432    }
433}