Skip to main content

vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9use std::fmt::Display;
10use std::fmt::Formatter;
11
12use futures::stream;
13use vortex_buffer::BufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17
18use crate::ArrayRef;
19use crate::IntoArray;
20use crate::array::Array;
21use crate::array::ArrayParts;
22use crate::array::TypedArrayRef;
23use crate::arrays::Chunked;
24use crate::arrays::PrimitiveArray;
25use crate::dtype::DType;
26use crate::iter::ArrayIterator;
27use crate::iter::ArrayIteratorAdapter;
28use crate::search_sorted::SearchSorted;
29use crate::search_sorted::SearchSortedSide;
30use crate::stream::ArrayStream;
31use crate::stream::ArrayStreamAdapter;
32use crate::validity::Validity;
33
34pub(super) const CHUNK_OFFSETS_SLOT: usize = 0;
35pub(super) const CHUNKS_OFFSET: usize = 1;
36
37#[derive(Clone, Debug)]
38pub struct ChunkedData {
39    pub(super) chunk_offsets: Vec<usize>,
40    /// This is used to find the next child to execute when in executing into a builder.
41    pub(super) next_builder_slot: usize,
42}
43
44impl Display for ChunkedData {
45    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46        write!(f, "nchunks: {}", self.chunk_offsets.len().saturating_sub(1))
47    }
48}
49
50pub trait ChunkedArrayExt: TypedArrayRef<Chunked> {
51    fn chunk_offsets_array(&self) -> &ArrayRef {
52        self.as_ref().slots()[CHUNK_OFFSETS_SLOT]
53            .as_ref()
54            .vortex_expect("validated chunk offsets slot")
55    }
56
57    fn nchunks(&self) -> usize {
58        self.as_ref().slots().len().saturating_sub(CHUNKS_OFFSET)
59    }
60
61    fn chunk(&self, idx: usize) -> &ArrayRef {
62        self.as_ref().slots()[CHUNKS_OFFSET + idx]
63            .as_ref()
64            .vortex_expect("validated chunk slot")
65    }
66
67    fn iter_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
68        Box::new(
69            self.as_ref().slots()[CHUNKS_OFFSET..]
70                .iter()
71                .map(|slot| slot.as_ref().vortex_expect("validated chunk slot")),
72        )
73    }
74
75    fn chunks(&self) -> Vec<ArrayRef> {
76        self.iter_chunks().cloned().collect()
77    }
78
79    fn non_empty_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
80        Box::new(self.iter_chunks().filter(|chunk| !chunk.is_empty()))
81    }
82
83    fn chunk_offsets(&self) -> &[usize] {
84        &self.chunk_offsets
85    }
86
87    fn find_chunk_idx(&self, index: usize) -> VortexResult<(usize, usize)> {
88        assert!(
89            index <= self.as_ref().len(),
90            "Index out of bounds of the array"
91        );
92        let chunk_offsets = self.chunk_offsets();
93        let index_chunk = chunk_offsets
94            .search_sorted(&index, SearchSortedSide::Right)?
95            .to_ends_index(self.nchunks() + 1)
96            .saturating_sub(1);
97        let chunk_start = chunk_offsets[index_chunk];
98        let index_in_chunk = index - chunk_start;
99        Ok((index_chunk, index_in_chunk))
100    }
101
102    fn array_iterator(&self) -> impl ArrayIterator + '_ {
103        ArrayIteratorAdapter::new(
104            self.as_ref().dtype().clone(),
105            self.iter_chunks().map(|chunk| Ok(chunk.clone())),
106        )
107    }
108
109    fn array_stream(&self) -> impl ArrayStream + '_ {
110        ArrayStreamAdapter::new(
111            self.as_ref().dtype().clone(),
112            stream::iter(self.iter_chunks().map(|chunk| Ok(chunk.clone()))),
113        )
114    }
115}
116impl<T: TypedArrayRef<Chunked>> ChunkedArrayExt for T {}
117
118impl ChunkedData {
119    pub(super) fn new(chunk_offsets: Vec<usize>) -> Self {
120        Self {
121            chunk_offsets,
122            next_builder_slot: CHUNKS_OFFSET,
123        }
124    }
125
126    pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec<usize> {
127        let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1);
128        chunk_offsets.push(0);
129        let mut curr_offset = 0;
130        for chunk in chunks {
131            curr_offset += chunk.len();
132            chunk_offsets.push(curr_offset);
133        }
134        chunk_offsets
135    }
136
137    pub(super) fn make_slots(
138        chunk_offsets: &[usize],
139        chunks: &[ArrayRef],
140    ) -> Vec<Option<ArrayRef>> {
141        let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(chunk_offsets.len());
142        for &offset in chunk_offsets {
143            let offset = u64::try_from(offset)
144                .vortex_expect("chunk offset must fit in u64 for serialization");
145            unsafe { chunk_offsets_buf.push_unchecked(offset) }
146        }
147
148        let mut slots = Vec::with_capacity(1 + chunks.len());
149        slots.push(Some(
150            PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable).into_array(),
151        ));
152        slots.extend(chunks.iter().map(|c| Some(c.clone())));
153        slots
154    }
155
156    /// Validates the components that would be used to create a `ChunkedArray`.
157    ///
158    /// This function checks all the invariants required by `ChunkedArray::new_unchecked`.
159    pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
160        for chunk in chunks {
161            if chunk.dtype() != dtype {
162                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
163            }
164        }
165
166        Ok(())
167    }
168}
169
170impl Array<Chunked> {
171    pub(super) fn with_next_builder_slot(mut self, next_builder_slot: usize) -> Self {
172        if let Some(data) = self.data_mut() {
173            data.next_builder_slot = next_builder_slot;
174            return self;
175        }
176        // This is the slow path that will be hit at most once per execution since the second one
177        // *MUST* have execlusive access due to this copy.
178        let stats = self.statistics().to_owned();
179        let mut data = self.data().clone();
180        data.next_builder_slot = next_builder_slot;
181        // SAFETY: we only modified next_builder_slot which doesn't affect array invariants.
182        unsafe {
183            Array::from_parts_unchecked(
184                ArrayParts::new(Chunked, self.dtype().clone(), self.len(), data)
185                    .with_slots(self.slots().to_vec()),
186            )
187        }
188        .with_stats_set(stats)
189    }
190
191    /// Constructs a new `ChunkedArray`.
192    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
193        ChunkedData::validate(&chunks, &dtype)?;
194        // SAFETY just validated on previous line.
195        Ok(unsafe { Self::new_unchecked(chunks, dtype) })
196    }
197
198    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
199        let mut new_chunks = Vec::new();
200        let mut chunks_to_combine = Vec::new();
201        let mut new_chunk_n_bytes = 0;
202        let mut new_chunk_n_elements = 0;
203        for chunk in self.iter_chunks() {
204            let n_bytes = chunk.nbytes();
205            let n_elements = chunk.len();
206
207            if (new_chunk_n_bytes + n_bytes > target_bytesize
208                || new_chunk_n_elements + n_elements > target_rowsize)
209                && !chunks_to_combine.is_empty()
210            {
211                #[expect(deprecated)]
212                let canonical = unsafe {
213                    Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone())
214                }
215                .into_array()
216                .to_canonical()?
217                .into_array();
218                new_chunks.push(canonical);
219
220                new_chunk_n_bytes = 0;
221                new_chunk_n_elements = 0;
222                chunks_to_combine = Vec::new();
223            }
224
225            if n_bytes > target_bytesize || n_elements > target_rowsize {
226                new_chunks.push(chunk.clone());
227            } else {
228                new_chunk_n_bytes += n_bytes;
229                new_chunk_n_elements += n_elements;
230                chunks_to_combine.push(chunk.clone());
231            }
232        }
233
234        if !chunks_to_combine.is_empty() {
235            #[expect(deprecated)]
236            let canonical =
237                unsafe { Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone()) }
238                    .into_array()
239                    .to_canonical()?
240                    .into_array();
241            new_chunks.push(canonical);
242        }
243
244        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
245    }
246
247    /// Creates a new `ChunkedArray` without validation.
248    ///
249    /// # Safety
250    ///
251    /// All chunks must have exactly the same [`DType`] as the provided `dtype`.
252    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
253        let len = chunks.iter().map(|chunk| chunk.len()).sum();
254        let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
255        unsafe {
256            Array::from_parts_unchecked(
257                ArrayParts::new(Chunked, dtype, len, ChunkedData::new(chunk_offsets.clone()))
258                    .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
259            )
260        }
261    }
262}
263
264impl FromIterator<ArrayRef> for Array<Chunked> {
265    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
266        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
267        let dtype = chunks
268            .first()
269            .map(|c| c.dtype().clone())
270            .vortex_expect("Cannot infer DType from an empty iterator");
271        Array::<Chunked>::try_new(chunks, dtype)
272            .vortex_expect("Failed to create chunked array from iterator")
273    }
274}
275
276#[cfg(test)]
277mod test {
278    use vortex_buffer::buffer;
279    use vortex_error::VortexResult;
280
281    use crate::IntoArray;
282    use crate::LEGACY_SESSION;
283    use crate::VortexSessionExecute;
284    use crate::arrays::ChunkedArray;
285    use crate::arrays::PrimitiveArray;
286    use crate::arrays::chunked::ChunkedArrayExt;
287    use crate::assert_arrays_eq;
288    use crate::dtype::DType;
289    use crate::dtype::Nullability;
290    use crate::dtype::PType;
291    use crate::validity::Validity;
292
293    #[test]
294    fn test_rechunk_one_chunk() {
295        let chunked = ChunkedArray::try_new(
296            vec![buffer![0u64].into_array()],
297            DType::Primitive(PType::U64, Nullability::NonNullable),
298        )
299        .unwrap();
300
301        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
302
303        assert_arrays_eq!(chunked, rechunked);
304    }
305
306    #[test]
307    fn test_rechunk_two_chunks() {
308        let chunked = ChunkedArray::try_new(
309            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
310            DType::Primitive(PType::U64, Nullability::NonNullable),
311        )
312        .unwrap();
313
314        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
315
316        assert_eq!(rechunked.nchunks(), 1);
317        assert_arrays_eq!(chunked, rechunked);
318    }
319
320    #[test]
321    fn test_rechunk_tiny_target_chunks() {
322        let chunked = ChunkedArray::try_new(
323            vec![
324                buffer![0u64, 1, 2, 3].into_array(),
325                buffer![4u64, 5].into_array(),
326            ],
327            DType::Primitive(PType::U64, Nullability::NonNullable),
328        )
329        .unwrap();
330
331        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
332
333        assert_eq!(rechunked.nchunks(), 2);
334        assert!(rechunked.iter_chunks().all(|c| c.len() < 5));
335        assert_arrays_eq!(chunked, rechunked);
336    }
337
338    #[test]
339    fn test_rechunk_with_too_big_chunk() {
340        let chunked = ChunkedArray::try_new(
341            vec![
342                buffer![0u64, 1, 2].into_array(),
343                buffer![42_u64; 6].into_array(),
344                buffer![4u64, 5].into_array(),
345                buffer![6u64, 7].into_array(),
346                buffer![8u64, 9].into_array(),
347            ],
348            DType::Primitive(PType::U64, Nullability::NonNullable),
349        )
350        .unwrap();
351
352        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
353        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
354
355        assert_eq!(rechunked.nchunks(), 4);
356        assert_arrays_eq!(chunked, rechunked);
357    }
358
359    #[test]
360    fn test_empty_chunks_all_valid() -> VortexResult<()> {
361        // Create chunks where some are empty but all non-empty chunks have all valid values
362        let chunks = vec![
363            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
364            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
365            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
366            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
367        ];
368
369        let chunked =
370            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
371
372        // Should be all_valid since all non-empty chunks are all_valid
373        assert!(chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
374        assert!(
375            !chunked
376                .into_array()
377                .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
378        );
379
380        Ok(())
381    }
382
383    #[test]
384    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
385        // Create chunks where some are empty but all non-empty chunks have all invalid values
386        let chunks = vec![
387            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
388            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
389            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
390            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
391        ];
392
393        let chunked =
394            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
395
396        // Should be all_invalid since all non-empty chunks are all_invalid
397        assert!(!chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
398        assert!(
399            chunked
400                .into_array()
401                .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
402        );
403
404        Ok(())
405    }
406
407    #[test]
408    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
409        // Create chunks with mixed validity including empty chunks
410        let chunks = vec![
411            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
412            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
413            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
414            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
415        ];
416
417        let chunked =
418            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
419
420        // Should be neither all_valid nor all_invalid
421        assert!(!chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
422        assert!(
423            !chunked
424                .into_array()
425                .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
426        );
427
428        Ok(())
429    }
430}