vortex_array/arrays/chunked/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! First-class chunked arrays.
5//!
6//! Vortex is a chunked array library that's able to
7
8use std::fmt::Debug;
9
10use futures_util::stream;
11use itertools::Itertools;
12use vortex_buffer::{Buffer, BufferMut};
13use vortex_dtype::DType;
14use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
15use vortex_mask::Mask;
16
17use crate::arrays::ChunkedVTable;
18use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
19use crate::search_sorted::{SearchSorted, SearchSortedSide};
20use crate::stats::{ArrayStats, StatsSetRef};
21use crate::stream::{ArrayStream, ArrayStreamAdapter};
22use crate::vtable::{ArrayVTable, ValidityVTable};
23use crate::{Array, ArrayRef, IntoArray};
24
25#[derive(Clone, Debug)]
26pub struct ChunkedArray {
27    dtype: DType,
28    len: usize,
29    chunk_offsets: Buffer<u64>,
30    chunks: Vec<ArrayRef>,
31    stats_set: ArrayStats,
32}
33
34impl ChunkedArray {
35    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
36        for chunk in &chunks {
37            if chunk.dtype() != &dtype {
38                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
39            }
40        }
41
42        Ok(Self::new_unchecked(chunks, dtype))
43    }
44
45    pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
46        let nchunks = chunks.len();
47
48        let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
49        unsafe { chunk_offsets.push_unchecked(0) }
50        let mut curr_offset = 0;
51        for c in &chunks {
52            curr_offset += c.len() as u64;
53            unsafe { chunk_offsets.push_unchecked(curr_offset) }
54        }
55        assert_eq!(chunk_offsets.len(), nchunks + 1);
56
57        Self {
58            dtype,
59            len: curr_offset.try_into().vortex_unwrap(),
60            chunk_offsets: chunk_offsets.freeze(),
61            chunks,
62            stats_set: Default::default(),
63        }
64    }
65
66    // TODO(ngates): remove result
67    #[inline]
68    pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
69        if idx >= self.nchunks() {
70            vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
71        }
72        Ok(&self.chunks[idx])
73    }
74
75    pub fn nchunks(&self) -> usize {
76        self.chunks.len()
77    }
78
79    #[inline]
80    pub fn chunk_offsets(&self) -> &Buffer<u64> {
81        &self.chunk_offsets
82    }
83
84    pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
85        assert!(index <= self.len(), "Index out of bounds of the array");
86        let index = index as u64;
87
88        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
89        // and take the last chunk (we subtract 1 since there's a leading 0)
90        let index_chunk = self
91            .chunk_offsets()
92            .search_sorted(&index, SearchSortedSide::Right)
93            .to_ends_index(self.nchunks() + 1)
94            .saturating_sub(1);
95        let chunk_start = self.chunk_offsets()[index_chunk];
96
97        let index_in_chunk =
98            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
99        (index_chunk, index_in_chunk)
100    }
101
102    pub fn chunks(&self) -> &[ArrayRef] {
103        &self.chunks
104    }
105
106    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
107        self.chunks().iter().filter(|c| !c.is_empty())
108    }
109
110    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
111        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
112    }
113
114    pub fn array_stream(&self) -> impl ArrayStream + '_ {
115        ArrayStreamAdapter::new(
116            self.dtype().clone(),
117            stream::iter(self.chunks().iter().cloned().map(Ok)),
118        )
119    }
120
121    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
122        let mut new_chunks = Vec::new();
123        let mut chunks_to_combine = Vec::new();
124        let mut new_chunk_n_bytes = 0;
125        let mut new_chunk_n_elements = 0;
126        for chunk in self.chunks() {
127            let n_bytes = chunk.nbytes();
128            let n_elements = chunk.len();
129
130            if (new_chunk_n_bytes + n_bytes > target_bytesize
131                || new_chunk_n_elements + n_elements > target_rowsize)
132                && !chunks_to_combine.is_empty()
133            {
134                new_chunks.push(
135                    ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
136                        .to_canonical()?
137                        .into_array(),
138                );
139
140                new_chunk_n_bytes = 0;
141                new_chunk_n_elements = 0;
142                chunks_to_combine = Vec::new();
143            }
144
145            if n_bytes > target_bytesize || n_elements > target_rowsize {
146                new_chunks.push(chunk.clone());
147            } else {
148                new_chunk_n_bytes += n_bytes;
149                new_chunk_n_elements += n_elements;
150                chunks_to_combine.push(chunk.clone());
151            }
152        }
153
154        if !chunks_to_combine.is_empty() {
155            new_chunks.push(
156                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
157                    .to_canonical()?
158                    .into_array(),
159            );
160        }
161
162        Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
163    }
164}
165
166impl FromIterator<ArrayRef> for ChunkedArray {
167    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
168        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
169        let dtype = chunks
170            .first()
171            .map(|c| c.dtype().clone())
172            .vortex_expect("Cannot infer DType from an empty iterator");
173        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
174    }
175}
176
177impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
178    fn len(array: &ChunkedArray) -> usize {
179        array.len
180    }
181
182    fn dtype(array: &ChunkedArray) -> &DType {
183        &array.dtype
184    }
185
186    fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
187        array.stats_set.to_ref(array.as_ref())
188    }
189}
190
191impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
192    fn is_valid(array: &ChunkedArray, index: usize) -> VortexResult<bool> {
193        if !array.dtype.is_nullable() {
194            return Ok(true);
195        }
196        let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
197        array.chunk(chunk)?.is_valid(offset_in_chunk)
198    }
199
200    fn all_valid(array: &ChunkedArray) -> VortexResult<bool> {
201        if !array.dtype().is_nullable() {
202            return Ok(true);
203        }
204        for chunk in array.chunks() {
205            if !chunk.all_valid()? {
206                return Ok(false);
207            }
208        }
209        Ok(true)
210    }
211
212    fn all_invalid(array: &ChunkedArray) -> VortexResult<bool> {
213        if !array.dtype().is_nullable() {
214            return Ok(false);
215        }
216        for chunk in array.chunks() {
217            if !chunk.all_invalid()? {
218                return Ok(false);
219            }
220        }
221        Ok(true)
222    }
223
224    fn validity_mask(array: &ChunkedArray) -> VortexResult<Mask> {
225        array
226            .chunks()
227            .iter()
228            .map(|a| a.validity_mask())
229            .try_collect()
230    }
231}
232
233#[cfg(test)]
234mod test {
235    use vortex_buffer::buffer;
236    use vortex_dtype::{DType, Nullability, PType};
237    use vortex_error::VortexResult;
238
239    use crate::array::Array;
240    use crate::arrays::ChunkedVTable;
241    use crate::arrays::chunked::ChunkedArray;
242    use crate::compute::conformance::binary_numeric::test_numeric;
243    use crate::compute::{cast, sub_scalar};
244    use crate::{IntoArray, ToCanonical, assert_arrays_eq};
245
246    fn chunked_array() -> ChunkedArray {
247        ChunkedArray::try_new(
248            vec![
249                buffer![1u64, 2, 3].into_array(),
250                buffer![4u64, 5, 6].into_array(),
251                buffer![7u64, 8, 9].into_array(),
252            ],
253            DType::Primitive(PType::U64, Nullability::NonNullable),
254        )
255        .unwrap()
256    }
257
258    #[test]
259    fn test_scalar_subtract() {
260        let chunked = chunked_array().into_array();
261        let to_subtract = 1u64;
262        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
263
264        let chunked = array.as_::<ChunkedVTable>();
265        let chunks_out = chunked.chunks();
266
267        let results = chunks_out[0]
268            .to_primitive()
269            .unwrap()
270            .as_slice::<u64>()
271            .to_vec();
272        assert_eq!(results, &[0u64, 1, 2]);
273        let results = chunks_out[1]
274            .to_primitive()
275            .unwrap()
276            .as_slice::<u64>()
277            .to_vec();
278        assert_eq!(results, &[3u64, 4, 5]);
279        let results = chunks_out[2]
280            .to_primitive()
281            .unwrap()
282            .as_slice::<u64>()
283            .to_vec();
284        assert_eq!(results, &[6u64, 7, 8]);
285    }
286
287    #[test]
288    fn test_rechunk_one_chunk() {
289        let chunked = ChunkedArray::try_new(
290            vec![buffer![0u64].into_array()],
291            DType::Primitive(PType::U64, Nullability::NonNullable),
292        )
293        .unwrap();
294
295        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
296
297        assert_arrays_eq!(chunked, rechunked);
298    }
299
300    #[test]
301    fn test_rechunk_two_chunks() {
302        let chunked = ChunkedArray::try_new(
303            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
304            DType::Primitive(PType::U64, Nullability::NonNullable),
305        )
306        .unwrap();
307
308        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
309
310        assert_eq!(rechunked.nchunks(), 1);
311        assert_arrays_eq!(chunked, rechunked);
312    }
313
314    #[test]
315    fn test_rechunk_tiny_target_chunks() {
316        let chunked = ChunkedArray::try_new(
317            vec![
318                buffer![0u64, 1, 2, 3].into_array(),
319                buffer![4u64, 5].into_array(),
320            ],
321            DType::Primitive(PType::U64, Nullability::NonNullable),
322        )
323        .unwrap();
324
325        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
326
327        assert_eq!(rechunked.nchunks(), 2);
328        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
329        assert_arrays_eq!(chunked, rechunked);
330    }
331
332    #[test]
333    fn test_rechunk_with_too_big_chunk() {
334        let chunked = ChunkedArray::try_new(
335            vec![
336                buffer![0u64, 1, 2].into_array(),
337                buffer![42_u64; 6].into_array(),
338                buffer![4u64, 5].into_array(),
339                buffer![6u64, 7].into_array(),
340                buffer![8u64, 9].into_array(),
341            ],
342            DType::Primitive(PType::U64, Nullability::NonNullable),
343        )
344        .unwrap();
345
346        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
347        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
348
349        assert_eq!(rechunked.nchunks(), 4);
350        assert_arrays_eq!(chunked, rechunked);
351    }
352
353    #[test]
354    fn test_chunked_binary_numeric() {
355        let array = chunked_array();
356        // The tests test both X - 1 and 1 - X, so we need signed values
357        let signed_dtype = DType::from(PType::try_from(array.dtype()).unwrap().to_signed());
358        let array = cast(array.as_ref(), &signed_dtype).unwrap();
359        test_numeric::<u64>(array)
360    }
361}