vortex_array/arrays/chunked/
array.rs

1//! First-class chunked arrays.
2//!
3//! Vortex is a chunked array library that's able to
4
5use std::fmt::Debug;
6
7use futures_util::stream;
8use itertools::Itertools;
9use vortex_buffer::{Buffer, BufferMut};
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
12use vortex_mask::Mask;
13
14use crate::arrays::ChunkedVTable;
15use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
16use crate::search_sorted::{SearchSorted, SearchSortedSide};
17use crate::stats::{ArrayStats, StatsSetRef};
18use crate::stream::{ArrayStream, ArrayStreamAdapter};
19use crate::vtable::{ArrayVTable, ValidityVTable};
20use crate::{Array, ArrayRef, IntoArray};
21
22#[derive(Clone, Debug)]
23pub struct ChunkedArray {
24    dtype: DType,
25    len: usize,
26    chunk_offsets: Buffer<u64>,
27    chunks: Vec<ArrayRef>,
28    stats_set: ArrayStats,
29}
30
31impl ChunkedArray {
32    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
33        for chunk in &chunks {
34            if chunk.dtype() != &dtype {
35                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
36            }
37        }
38
39        Ok(Self::new_unchecked(chunks, dtype))
40    }
41
42    pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
43        let nchunks = chunks.len();
44
45        let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
46        unsafe { chunk_offsets.push_unchecked(0) }
47        let mut curr_offset = 0;
48        for c in &chunks {
49            curr_offset += c.len() as u64;
50            unsafe { chunk_offsets.push_unchecked(curr_offset) }
51        }
52        assert_eq!(chunk_offsets.len(), nchunks + 1);
53
54        Self {
55            dtype,
56            len: curr_offset.try_into().vortex_unwrap(),
57            chunk_offsets: chunk_offsets.freeze(),
58            chunks,
59            stats_set: Default::default(),
60        }
61    }
62
63    // TODO(ngates): remove result
64    #[inline]
65    pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
66        if idx >= self.nchunks() {
67            vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
68        }
69        Ok(&self.chunks[idx])
70    }
71
72    pub fn nchunks(&self) -> usize {
73        self.chunks.len()
74    }
75
76    #[inline]
77    pub fn chunk_offsets(&self) -> &Buffer<u64> {
78        &self.chunk_offsets
79    }
80
81    pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
82        assert!(index <= self.len(), "Index out of bounds of the array");
83        let index = index as u64;
84
85        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
86        // and take the last chunk (we subtract 1 since there's a leading 0)
87        let index_chunk = self
88            .chunk_offsets()
89            .search_sorted(&index, SearchSortedSide::Right)
90            .to_ends_index(self.nchunks() + 1)
91            .saturating_sub(1);
92        let chunk_start = self.chunk_offsets()[index_chunk];
93
94        let index_in_chunk =
95            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
96        (index_chunk, index_in_chunk)
97    }
98
99    pub fn chunks(&self) -> &[ArrayRef] {
100        &self.chunks
101    }
102
103    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
104        self.chunks().iter().filter(|c| !c.is_empty())
105    }
106
107    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
108        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
109    }
110
111    pub fn array_stream(&self) -> impl ArrayStream + '_ {
112        ArrayStreamAdapter::new(
113            self.dtype().clone(),
114            stream::iter(self.chunks().iter().cloned().map(Ok)),
115        )
116    }
117
118    pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult<Self> {
119        let mut new_chunks = Vec::new();
120        let mut chunks_to_combine = Vec::new();
121        let mut new_chunk_n_bytes = 0;
122        let mut new_chunk_n_elements = 0;
123        for chunk in self.chunks() {
124            let n_bytes = chunk.nbytes();
125            let n_elements = chunk.len();
126
127            if (new_chunk_n_bytes + n_bytes > target_bytesize
128                || new_chunk_n_elements + n_elements > target_rowsize)
129                && !chunks_to_combine.is_empty()
130            {
131                new_chunks.push(
132                    ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
133                        .to_canonical()?
134                        .into_array(),
135                );
136
137                new_chunk_n_bytes = 0;
138                new_chunk_n_elements = 0;
139                chunks_to_combine = Vec::new();
140            }
141
142            if n_bytes > target_bytesize || n_elements > target_rowsize {
143                new_chunks.push(chunk.clone());
144            } else {
145                new_chunk_n_bytes += n_bytes;
146                new_chunk_n_elements += n_elements;
147                chunks_to_combine.push(chunk.clone());
148            }
149        }
150
151        if !chunks_to_combine.is_empty() {
152            new_chunks.push(
153                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
154                    .to_canonical()?
155                    .into_array(),
156            );
157        }
158
159        Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
160    }
161}
162
163impl FromIterator<ArrayRef> for ChunkedArray {
164    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
165        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
166        let dtype = chunks
167            .first()
168            .map(|c| c.dtype().clone())
169            .vortex_expect("Cannot infer DType from an empty iterator");
170        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
171    }
172}
173
174impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
175    fn len(array: &ChunkedArray) -> usize {
176        array.len
177    }
178
179    fn dtype(array: &ChunkedArray) -> &DType {
180        &array.dtype
181    }
182
183    fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
184        array.stats_set.to_ref(array.as_ref())
185    }
186}
187
188impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
189    fn is_valid(array: &ChunkedArray, index: usize) -> VortexResult<bool> {
190        if !array.dtype.is_nullable() {
191            return Ok(true);
192        }
193        let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
194        array.chunk(chunk)?.is_valid(offset_in_chunk)
195    }
196
197    fn all_valid(array: &ChunkedArray) -> VortexResult<bool> {
198        if !array.dtype().is_nullable() {
199            return Ok(true);
200        }
201        for chunk in array.chunks() {
202            if !chunk.all_valid()? {
203                return Ok(false);
204            }
205        }
206        Ok(true)
207    }
208
209    fn all_invalid(array: &ChunkedArray) -> VortexResult<bool> {
210        if !array.dtype().is_nullable() {
211            return Ok(false);
212        }
213        for chunk in array.chunks() {
214            if !chunk.all_invalid()? {
215                return Ok(false);
216            }
217        }
218        Ok(true)
219    }
220
221    fn validity_mask(array: &ChunkedArray) -> VortexResult<Mask> {
222        array
223            .chunks()
224            .iter()
225            .map(|a| a.validity_mask())
226            .try_collect()
227    }
228}
229
230#[cfg(test)]
231mod test {
232    use vortex_buffer::buffer;
233    use vortex_dtype::{DType, Nullability, PType};
234    use vortex_error::VortexResult;
235
236    use crate::array::Array;
237    use crate::arrays::ChunkedVTable;
238    use crate::arrays::chunked::ChunkedArray;
239    use crate::compute::conformance::binary_numeric::test_numeric;
240    use crate::compute::{cast, sub_scalar};
241    use crate::{IntoArray, ToCanonical, assert_arrays_eq};
242
243    fn chunked_array() -> ChunkedArray {
244        ChunkedArray::try_new(
245            vec![
246                buffer![1u64, 2, 3].into_array(),
247                buffer![4u64, 5, 6].into_array(),
248                buffer![7u64, 8, 9].into_array(),
249            ],
250            DType::Primitive(PType::U64, Nullability::NonNullable),
251        )
252        .unwrap()
253    }
254
255    #[test]
256    fn test_scalar_subtract() {
257        let chunked = chunked_array().into_array();
258        let to_subtract = 1u64;
259        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
260
261        let chunked = array.as_::<ChunkedVTable>();
262        let chunks_out = chunked.chunks();
263
264        let results = chunks_out[0]
265            .to_primitive()
266            .unwrap()
267            .as_slice::<u64>()
268            .to_vec();
269        assert_eq!(results, &[0u64, 1, 2]);
270        let results = chunks_out[1]
271            .to_primitive()
272            .unwrap()
273            .as_slice::<u64>()
274            .to_vec();
275        assert_eq!(results, &[3u64, 4, 5]);
276        let results = chunks_out[2]
277            .to_primitive()
278            .unwrap()
279            .as_slice::<u64>()
280            .to_vec();
281        assert_eq!(results, &[6u64, 7, 8]);
282    }
283
284    #[test]
285    fn test_rechunk_one_chunk() {
286        let chunked = ChunkedArray::try_new(
287            vec![buffer![0u64].into_array()],
288            DType::Primitive(PType::U64, Nullability::NonNullable),
289        )
290        .unwrap();
291
292        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
293
294        assert_arrays_eq!(chunked, rechunked);
295    }
296
297    #[test]
298    fn test_rechunk_two_chunks() {
299        let chunked = ChunkedArray::try_new(
300            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
301            DType::Primitive(PType::U64, Nullability::NonNullable),
302        )
303        .unwrap();
304
305        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
306
307        assert_eq!(rechunked.nchunks(), 1);
308        assert_arrays_eq!(chunked, rechunked);
309    }
310
311    #[test]
312    fn test_rechunk_tiny_target_chunks() {
313        let chunked = ChunkedArray::try_new(
314            vec![
315                buffer![0u64, 1, 2, 3].into_array(),
316                buffer![4u64, 5].into_array(),
317            ],
318            DType::Primitive(PType::U64, Nullability::NonNullable),
319        )
320        .unwrap();
321
322        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
323
324        assert_eq!(rechunked.nchunks(), 2);
325        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
326        assert_arrays_eq!(chunked, rechunked);
327    }
328
329    #[test]
330    fn test_rechunk_with_too_big_chunk() {
331        let chunked = ChunkedArray::try_new(
332            vec![
333                buffer![0u64, 1, 2].into_array(),
334                buffer![42_u64; 6].into_array(),
335                buffer![4u64, 5].into_array(),
336                buffer![6u64, 7].into_array(),
337                buffer![8u64, 9].into_array(),
338            ],
339            DType::Primitive(PType::U64, Nullability::NonNullable),
340        )
341        .unwrap();
342
343        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
344        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
345
346        assert_eq!(rechunked.nchunks(), 4);
347        assert_arrays_eq!(chunked, rechunked);
348    }
349
350    #[test]
351    fn test_chunked_binary_numeric() {
352        let array = chunked_array();
353        // The tests test both X - 1 and 1 - X, so we need signed values
354        let signed_dtype = DType::from(PType::try_from(array.dtype()).unwrap().to_signed());
355        let array = cast(array.as_ref(), &signed_dtype).unwrap();
356        test_numeric::<u64>(array)
357    }
358}