Skip to main content

vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9use std::fmt::Display;
10use std::fmt::Formatter;
11
12use futures::stream;
13use smallvec::SmallVec;
14use vortex_buffer::BufferMut;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_error::vortex_bail;
18
19use crate::ArrayRef;
20use crate::ArraySlots;
21use crate::IntoArray;
22use crate::array::Array;
23use crate::array::ArrayParts;
24use crate::array::TypedArrayRef;
25use crate::arrays::Chunked;
26use crate::arrays::PrimitiveArray;
27use crate::dtype::DType;
28use crate::iter::ArrayIterator;
29use crate::iter::ArrayIteratorAdapter;
30use crate::search_sorted::SearchSorted;
31use crate::search_sorted::SearchSortedSide;
32use crate::stream::ArrayStream;
33use crate::stream::ArrayStreamAdapter;
34use crate::validity::Validity;
35
36pub(super) const CHUNK_OFFSETS_SLOT: usize = 0;
37pub(super) const CHUNKS_OFFSET: usize = 1;
38
39#[derive(Clone, Debug)]
40pub struct ChunkedData {
41    pub(super) chunk_offsets: Vec<usize>,
42    /// This is used to find the next child to execute when in executing into a builder.
43    pub(super) next_builder_slot: usize,
44}
45
46impl Display for ChunkedData {
47    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48        write!(f, "nchunks: {}", self.chunk_offsets.len().saturating_sub(1))
49    }
50}
51
52pub trait ChunkedArrayExt: TypedArrayRef<Chunked> {
53    fn chunk_offsets_array(&self) -> &ArrayRef {
54        self.as_ref().slots()[CHUNK_OFFSETS_SLOT]
55            .as_ref()
56            .vortex_expect("validated chunk offsets slot")
57    }
58
59    fn nchunks(&self) -> usize {
60        self.as_ref().slots().len().saturating_sub(CHUNKS_OFFSET)
61    }
62
63    fn chunk(&self, idx: usize) -> &ArrayRef {
64        self.as_ref().slots()[CHUNKS_OFFSET + idx]
65            .as_ref()
66            .vortex_expect("validated chunk slot")
67    }
68
69    fn iter_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
70        Box::new(
71            self.as_ref().slots()[CHUNKS_OFFSET..]
72                .iter()
73                .map(|slot| slot.as_ref().vortex_expect("validated chunk slot")),
74        )
75    }
76
77    fn chunks(&self) -> Vec<ArrayRef> {
78        self.iter_chunks().cloned().collect()
79    }
80
81    fn non_empty_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
82        Box::new(self.iter_chunks().filter(|chunk| !chunk.is_empty()))
83    }
84
85    fn chunk_offsets(&self) -> &[usize] {
86        &self.chunk_offsets
87    }
88
89    fn find_chunk_idx(&self, index: usize) -> VortexResult<(usize, usize)> {
90        assert!(
91            index <= self.as_ref().len(),
92            "Index out of bounds of the array"
93        );
94        let chunk_offsets = self.chunk_offsets();
95        let index_chunk = chunk_offsets
96            .search_sorted(&index, SearchSortedSide::Right)?
97            .to_ends_index(self.nchunks() + 1)
98            .saturating_sub(1);
99        let chunk_start = chunk_offsets[index_chunk];
100        let index_in_chunk = index - chunk_start;
101        Ok((index_chunk, index_in_chunk))
102    }
103
104    fn array_iterator(&self) -> impl ArrayIterator + '_ {
105        ArrayIteratorAdapter::new(
106            self.as_ref().dtype().clone(),
107            self.iter_chunks().map(|chunk| Ok(chunk.clone())),
108        )
109    }
110
111    fn array_stream(&self) -> impl ArrayStream + '_ {
112        ArrayStreamAdapter::new(
113            self.as_ref().dtype().clone(),
114            stream::iter(self.iter_chunks().map(|chunk| Ok(chunk.clone()))),
115        )
116    }
117}
118impl<T: TypedArrayRef<Chunked>> ChunkedArrayExt for T {}
119
120impl ChunkedData {
121    pub(super) fn new(chunk_offsets: Vec<usize>) -> Self {
122        Self {
123            chunk_offsets,
124            next_builder_slot: CHUNKS_OFFSET,
125        }
126    }
127
128    pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec<usize> {
129        let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1);
130        chunk_offsets.push(0);
131        let mut curr_offset = 0;
132        for chunk in chunks {
133            curr_offset += chunk.len();
134            chunk_offsets.push(curr_offset);
135        }
136        chunk_offsets
137    }
138
139    pub(super) fn make_slots(chunk_offsets: &[usize], chunks: &[ArrayRef]) -> ArraySlots {
140        let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(chunk_offsets.len());
141        for &offset in chunk_offsets {
142            let offset = u64::try_from(offset)
143                .vortex_expect("chunk offset must fit in u64 for serialization");
144            unsafe { chunk_offsets_buf.push_unchecked(offset) }
145        }
146
147        let mut slots = SmallVec::with_capacity(1 + chunks.len());
148        slots.push(Some(
149            PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable).into_array(),
150        ));
151        slots.extend(chunks.iter().map(|c| Some(c.clone())));
152        slots
153    }
154
155    /// Validates the components that would be used to create a `ChunkedArray`.
156    ///
157    /// This function checks all the invariants required by `ChunkedArray::new_unchecked`.
158    pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
159        for chunk in chunks {
160            if chunk.dtype() != dtype {
161                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
162            }
163        }
164
165        Ok(())
166    }
167}
168
169impl Array<Chunked> {
170    pub(super) fn with_next_builder_slot(mut self, next_builder_slot: usize) -> Self {
171        if let Some(data) = self.data_mut() {
172            data.next_builder_slot = next_builder_slot;
173            return self;
174        }
175        // This is the slow path that will be hit at most once per execution since the second one
176        // *MUST* have execlusive access due to this copy.
177        let stats = self.statistics().to_owned();
178        let mut data = self.data().clone();
179        data.next_builder_slot = next_builder_slot;
180        // SAFETY: we only modified next_builder_slot which doesn't affect array invariants.
181        unsafe {
182            Array::from_parts_unchecked(
183                ArrayParts::new(Chunked, self.dtype().clone(), self.len(), data)
184                    .with_slots(self.slots().iter().cloned().collect::<ArraySlots>()),
185            )
186        }
187        .with_stats_set(stats)
188    }
189
190    /// Constructs a new `ChunkedArray`.
191    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
192        ChunkedData::validate(&chunks, &dtype)?;
193        // SAFETY just validated on previous line.
194        Ok(unsafe { Self::new_unchecked(chunks, dtype) })
195    }
196
197    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
198        let mut new_chunks = Vec::new();
199        let mut chunks_to_combine = Vec::new();
200        let mut new_chunk_n_bytes = 0;
201        let mut new_chunk_n_elements = 0;
202        for chunk in self.iter_chunks() {
203            let n_bytes = chunk.nbytes();
204            let n_elements = chunk.len();
205
206            if (new_chunk_n_bytes + n_bytes > target_bytesize
207                || new_chunk_n_elements + n_elements > target_rowsize)
208                && !chunks_to_combine.is_empty()
209            {
210                #[expect(deprecated)]
211                let canonical = unsafe {
212                    Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone())
213                }
214                .into_array()
215                .to_canonical()?
216                .into_array();
217                new_chunks.push(canonical);
218
219                new_chunk_n_bytes = 0;
220                new_chunk_n_elements = 0;
221                chunks_to_combine = Vec::new();
222            }
223
224            if n_bytes > target_bytesize || n_elements > target_rowsize {
225                new_chunks.push(chunk.clone());
226            } else {
227                new_chunk_n_bytes += n_bytes;
228                new_chunk_n_elements += n_elements;
229                chunks_to_combine.push(chunk.clone());
230            }
231        }
232
233        if !chunks_to_combine.is_empty() {
234            #[expect(deprecated)]
235            let canonical =
236                unsafe { Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone()) }
237                    .into_array()
238                    .to_canonical()?
239                    .into_array();
240            new_chunks.push(canonical);
241        }
242
243        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
244    }
245
246    /// Creates a new `ChunkedArray` without validation.
247    ///
248    /// # Safety
249    ///
250    /// All chunks must have exactly the same [`DType`] as the provided `dtype`.
251    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
252        let len = chunks.iter().map(|chunk| chunk.len()).sum();
253        let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
254        unsafe {
255            Array::from_parts_unchecked(
256                ArrayParts::new(Chunked, dtype, len, ChunkedData::new(chunk_offsets.clone()))
257                    .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
258            )
259        }
260    }
261}
262
263impl FromIterator<ArrayRef> for Array<Chunked> {
264    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
265        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
266        let dtype = chunks
267            .first()
268            .map(|c| c.dtype().clone())
269            .vortex_expect("Cannot infer DType from an empty iterator");
270        Array::<Chunked>::try_new(chunks, dtype)
271            .vortex_expect("Failed to create chunked array from iterator")
272    }
273}
274
275#[cfg(test)]
276mod test {
277    use vortex_buffer::buffer;
278    use vortex_error::VortexResult;
279
280    use crate::IntoArray;
281    use crate::LEGACY_SESSION;
282    use crate::VortexSessionExecute;
283    use crate::arrays::ChunkedArray;
284    use crate::arrays::PrimitiveArray;
285    use crate::arrays::chunked::ChunkedArrayExt;
286    use crate::assert_arrays_eq;
287    use crate::dtype::DType;
288    use crate::dtype::Nullability;
289    use crate::dtype::PType;
290    use crate::validity::Validity;
291
292    #[test]
293    fn test_rechunk_one_chunk() {
294        let chunked = ChunkedArray::try_new(
295            vec![buffer![0u64].into_array()],
296            DType::Primitive(PType::U64, Nullability::NonNullable),
297        )
298        .unwrap();
299
300        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
301
302        assert_arrays_eq!(chunked, rechunked);
303    }
304
305    #[test]
306    fn test_rechunk_two_chunks() {
307        let chunked = ChunkedArray::try_new(
308            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
309            DType::Primitive(PType::U64, Nullability::NonNullable),
310        )
311        .unwrap();
312
313        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
314
315        assert_eq!(rechunked.nchunks(), 1);
316        assert_arrays_eq!(chunked, rechunked);
317    }
318
319    #[test]
320    fn test_rechunk_tiny_target_chunks() {
321        let chunked = ChunkedArray::try_new(
322            vec![
323                buffer![0u64, 1, 2, 3].into_array(),
324                buffer![4u64, 5].into_array(),
325            ],
326            DType::Primitive(PType::U64, Nullability::NonNullable),
327        )
328        .unwrap();
329
330        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
331
332        assert_eq!(rechunked.nchunks(), 2);
333        assert!(rechunked.iter_chunks().all(|c| c.len() < 5));
334        assert_arrays_eq!(chunked, rechunked);
335    }
336
337    #[test]
338    fn test_rechunk_with_too_big_chunk() {
339        let chunked = ChunkedArray::try_new(
340            vec![
341                buffer![0u64, 1, 2].into_array(),
342                buffer![42_u64; 6].into_array(),
343                buffer![4u64, 5].into_array(),
344                buffer![6u64, 7].into_array(),
345                buffer![8u64, 9].into_array(),
346            ],
347            DType::Primitive(PType::U64, Nullability::NonNullable),
348        )
349        .unwrap();
350
351        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
352        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
353
354        assert_eq!(rechunked.nchunks(), 4);
355        assert_arrays_eq!(chunked, rechunked);
356    }
357
358    #[test]
359    fn test_empty_chunks_all_valid() -> VortexResult<()> {
360        // Create chunks where some are empty but all non-empty chunks have all valid values
361        let chunks = vec![
362            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
363            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
364            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
365            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
366        ];
367
368        let chunked =
369            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
370
371        // Should be all_valid since all non-empty chunks are all_valid
372        assert!(chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
373        assert!(
374            !chunked
375                .into_array()
376                .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
377        );
378
379        Ok(())
380    }
381
382    #[test]
383    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
384        // Create chunks where some are empty but all non-empty chunks have all invalid values
385        let chunks = vec![
386            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
387            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
388            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
389            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
390        ];
391
392        let chunked =
393            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
394
395        // Should be all_invalid since all non-empty chunks are all_invalid
396        assert!(!chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
397        assert!(
398            chunked
399                .into_array()
400                .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
401        );
402
403        Ok(())
404    }
405
406    #[test]
407    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
408        // Create chunks with mixed validity including empty chunks
409        let chunks = vec![
410            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
411            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
412            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
413            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
414        ];
415
416        let chunked =
417            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
418
419        // Should be neither all_valid nor all_invalid
420        assert!(!chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
421        assert!(
422            !chunked
423                .into_array()
424                .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
425        );
426
427        Ok(())
428    }
429}