Skip to main content

vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9use std::fmt::Display;
10use std::fmt::Formatter;
11
12use futures::stream;
13use vortex_buffer::BufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17
18use crate::ArrayRef;
19use crate::IntoArray;
20use crate::array::Array;
21use crate::array::ArrayParts;
22use crate::array::TypedArrayRef;
23use crate::arrays::Chunked;
24use crate::arrays::PrimitiveArray;
25use crate::dtype::DType;
26use crate::iter::ArrayIterator;
27use crate::iter::ArrayIteratorAdapter;
28use crate::search_sorted::SearchSorted;
29use crate::search_sorted::SearchSortedSide;
30use crate::stream::ArrayStream;
31use crate::stream::ArrayStreamAdapter;
32use crate::validity::Validity;
33
34pub(super) const CHUNK_OFFSETS_SLOT: usize = 0;
35pub(super) const CHUNKS_OFFSET: usize = 1;
36
37#[derive(Clone, Debug)]
38pub struct ChunkedData {
39    pub(super) chunk_offsets: Vec<usize>,
40}
41
42impl Display for ChunkedData {
43    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44        write!(f, "nchunks: {}", self.chunk_offsets.len().saturating_sub(1))
45    }
46}
47
48pub trait ChunkedArrayExt: TypedArrayRef<Chunked> {
49    fn chunk_offsets_array(&self) -> &ArrayRef {
50        self.as_ref().slots()[CHUNK_OFFSETS_SLOT]
51            .as_ref()
52            .vortex_expect("validated chunk offsets slot")
53    }
54
55    fn nchunks(&self) -> usize {
56        self.as_ref().slots().len().saturating_sub(CHUNKS_OFFSET)
57    }
58
59    fn chunk(&self, idx: usize) -> &ArrayRef {
60        self.as_ref().slots()[CHUNKS_OFFSET + idx]
61            .as_ref()
62            .vortex_expect("validated chunk slot")
63    }
64
65    fn iter_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
66        Box::new(
67            self.as_ref().slots()[CHUNKS_OFFSET..]
68                .iter()
69                .map(|slot| slot.as_ref().vortex_expect("validated chunk slot")),
70        )
71    }
72
73    fn chunks(&self) -> Vec<ArrayRef> {
74        self.iter_chunks().cloned().collect()
75    }
76
77    fn non_empty_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
78        Box::new(self.iter_chunks().filter(|chunk| !chunk.is_empty()))
79    }
80
81    fn chunk_offsets(&self) -> &[usize] {
82        &self.chunk_offsets
83    }
84
85    fn find_chunk_idx(&self, index: usize) -> VortexResult<(usize, usize)> {
86        assert!(
87            index <= self.as_ref().len(),
88            "Index out of bounds of the array"
89        );
90        let chunk_offsets = self.chunk_offsets();
91        let index_chunk = chunk_offsets
92            .search_sorted(&index, SearchSortedSide::Right)?
93            .to_ends_index(self.nchunks() + 1)
94            .saturating_sub(1);
95        let chunk_start = chunk_offsets[index_chunk];
96        let index_in_chunk = index - chunk_start;
97        Ok((index_chunk, index_in_chunk))
98    }
99
100    fn array_iterator(&self) -> impl ArrayIterator + '_ {
101        ArrayIteratorAdapter::new(
102            self.as_ref().dtype().clone(),
103            self.iter_chunks().map(|chunk| Ok(chunk.clone())),
104        )
105    }
106
107    fn array_stream(&self) -> impl ArrayStream + '_ {
108        ArrayStreamAdapter::new(
109            self.as_ref().dtype().clone(),
110            stream::iter(self.iter_chunks().map(|chunk| Ok(chunk.clone()))),
111        )
112    }
113}
114impl<T: TypedArrayRef<Chunked>> ChunkedArrayExt for T {}
115
116impl ChunkedData {
117    pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec<usize> {
118        let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1);
119        chunk_offsets.push(0);
120        let mut curr_offset = 0;
121        for chunk in chunks {
122            curr_offset += chunk.len();
123            chunk_offsets.push(curr_offset);
124        }
125        chunk_offsets
126    }
127
128    pub(super) fn make_slots(
129        chunk_offsets: &[usize],
130        chunks: &[ArrayRef],
131    ) -> Vec<Option<ArrayRef>> {
132        let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(chunk_offsets.len());
133        for &offset in chunk_offsets {
134            let offset = u64::try_from(offset)
135                .vortex_expect("chunk offset must fit in u64 for serialization");
136            unsafe { chunk_offsets_buf.push_unchecked(offset) }
137        }
138
139        let mut slots = Vec::with_capacity(1 + chunks.len());
140        slots.push(Some(
141            PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable).into_array(),
142        ));
143        slots.extend(chunks.iter().map(|c| Some(c.clone())));
144        slots
145    }
146
147    /// Validates the components that would be used to create a `ChunkedArray`.
148    ///
149    /// This function checks all the invariants required by `ChunkedArray::new_unchecked`.
150    pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
151        for chunk in chunks {
152            if chunk.dtype() != dtype {
153                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
154            }
155        }
156
157        Ok(())
158    }
159}
160
161impl Array<Chunked> {
162    /// Constructs a new `ChunkedArray`.
163    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
164        ChunkedData::validate(&chunks, &dtype)?;
165        let len = chunks.iter().map(|chunk| chunk.len()).sum();
166        let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
167        Ok(unsafe {
168            Array::from_parts_unchecked(
169                ArrayParts::new(
170                    Chunked,
171                    dtype,
172                    len,
173                    ChunkedData {
174                        chunk_offsets: chunk_offsets.clone(),
175                    },
176                )
177                .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
178            )
179        })
180    }
181
182    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
183        let mut new_chunks = Vec::new();
184        let mut chunks_to_combine = Vec::new();
185        let mut new_chunk_n_bytes = 0;
186        let mut new_chunk_n_elements = 0;
187        for chunk in self.iter_chunks() {
188            let n_bytes = chunk.nbytes();
189            let n_elements = chunk.len();
190
191            if (new_chunk_n_bytes + n_bytes > target_bytesize
192                || new_chunk_n_elements + n_elements > target_rowsize)
193                && !chunks_to_combine.is_empty()
194            {
195                new_chunks.push(
196                    unsafe {
197                        Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone())
198                    }
199                    .into_array()
200                    .to_canonical()?
201                    .into_array(),
202                );
203
204                new_chunk_n_bytes = 0;
205                new_chunk_n_elements = 0;
206                chunks_to_combine = Vec::new();
207            }
208
209            if n_bytes > target_bytesize || n_elements > target_rowsize {
210                new_chunks.push(chunk.clone());
211            } else {
212                new_chunk_n_bytes += n_bytes;
213                new_chunk_n_elements += n_elements;
214                chunks_to_combine.push(chunk.clone());
215            }
216        }
217
218        if !chunks_to_combine.is_empty() {
219            new_chunks.push(
220                unsafe { Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone()) }
221                    .into_array()
222                    .to_canonical()?
223                    .into_array(),
224            );
225        }
226
227        unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
228    }
229
230    /// Creates a new `ChunkedArray` without validation.
231    ///
232    /// # Safety
233    ///
234    /// All chunks must have exactly the same [`DType`] as the provided `dtype`.
235    pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
236        let len = chunks.iter().map(|chunk| chunk.len()).sum();
237        let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
238        unsafe {
239            Array::from_parts_unchecked(
240                ArrayParts::new(
241                    Chunked,
242                    dtype,
243                    len,
244                    ChunkedData {
245                        chunk_offsets: chunk_offsets.clone(),
246                    },
247                )
248                .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
249            )
250        }
251    }
252}
253
254impl FromIterator<ArrayRef> for Array<Chunked> {
255    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
256        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
257        let dtype = chunks
258            .first()
259            .map(|c| c.dtype().clone())
260            .vortex_expect("Cannot infer DType from an empty iterator");
261        Array::<Chunked>::try_new(chunks, dtype)
262            .vortex_expect("Failed to create chunked array from iterator")
263    }
264}
265
266#[cfg(test)]
267mod test {
268    use vortex_buffer::buffer;
269    use vortex_error::VortexResult;
270
271    use crate::IntoArray;
272    use crate::arrays::ChunkedArray;
273    use crate::arrays::PrimitiveArray;
274    use crate::arrays::chunked::ChunkedArrayExt;
275    use crate::assert_arrays_eq;
276    use crate::dtype::DType;
277    use crate::dtype::Nullability;
278    use crate::dtype::PType;
279    use crate::validity::Validity;
280
281    #[test]
282    fn test_rechunk_one_chunk() {
283        let chunked = ChunkedArray::try_new(
284            vec![buffer![0u64].into_array()],
285            DType::Primitive(PType::U64, Nullability::NonNullable),
286        )
287        .unwrap();
288
289        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
290
291        assert_arrays_eq!(chunked, rechunked);
292    }
293
294    #[test]
295    fn test_rechunk_two_chunks() {
296        let chunked = ChunkedArray::try_new(
297            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
298            DType::Primitive(PType::U64, Nullability::NonNullable),
299        )
300        .unwrap();
301
302        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
303
304        assert_eq!(rechunked.nchunks(), 1);
305        assert_arrays_eq!(chunked, rechunked);
306    }
307
308    #[test]
309    fn test_rechunk_tiny_target_chunks() {
310        let chunked = ChunkedArray::try_new(
311            vec![
312                buffer![0u64, 1, 2, 3].into_array(),
313                buffer![4u64, 5].into_array(),
314            ],
315            DType::Primitive(PType::U64, Nullability::NonNullable),
316        )
317        .unwrap();
318
319        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
320
321        assert_eq!(rechunked.nchunks(), 2);
322        assert!(rechunked.iter_chunks().all(|c| c.len() < 5));
323        assert_arrays_eq!(chunked, rechunked);
324    }
325
326    #[test]
327    fn test_rechunk_with_too_big_chunk() {
328        let chunked = ChunkedArray::try_new(
329            vec![
330                buffer![0u64, 1, 2].into_array(),
331                buffer![42_u64; 6].into_array(),
332                buffer![4u64, 5].into_array(),
333                buffer![6u64, 7].into_array(),
334                buffer![8u64, 9].into_array(),
335            ],
336            DType::Primitive(PType::U64, Nullability::NonNullable),
337        )
338        .unwrap();
339
340        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
341        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
342
343        assert_eq!(rechunked.nchunks(), 4);
344        assert_arrays_eq!(chunked, rechunked);
345    }
346
347    #[test]
348    fn test_empty_chunks_all_valid() -> VortexResult<()> {
349        // Create chunks where some are empty but all non-empty chunks have all valid values
350        let chunks = vec![
351            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
352            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
353            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
354            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
355        ];
356
357        let chunked =
358            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
359
360        // Should be all_valid since all non-empty chunks are all_valid
361        assert!(chunked.all_valid().unwrap());
362        assert!(!chunked.into_array().all_invalid().unwrap());
363
364        Ok(())
365    }
366
367    #[test]
368    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
369        // Create chunks where some are empty but all non-empty chunks have all invalid values
370        let chunks = vec![
371            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
372            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
373            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
374            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
375        ];
376
377        let chunked =
378            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
379
380        // Should be all_invalid since all non-empty chunks are all_invalid
381        assert!(!chunked.all_valid().unwrap());
382        assert!(chunked.into_array().all_invalid().unwrap());
383
384        Ok(())
385    }
386
387    #[test]
388    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
389        // Create chunks with mixed validity including empty chunks
390        let chunks = vec![
391            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
392            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
393            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
394            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
395        ];
396
397        let chunked =
398            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
399
400        // Should be neither all_valid nor all_invalid
401        assert!(!chunked.all_valid().unwrap());
402        assert!(!chunked.into_array().all_invalid().unwrap());
403
404        Ok(())
405    }
406}