vortex_array/arrays/chunked/
mod.rs

1//! First-class chunked arrays.
2//!
3//! Vortex is a chunked array library that's able to
4
5use std::fmt::Debug;
6
7use futures_util::stream;
8use itertools::Itertools;
9use vortex_buffer::{Buffer, BufferMut};
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
12use vortex_mask::Mask;
13
14use crate::array::ArrayValidityImpl;
15use crate::compute::{SearchSorted, SearchSortedSide};
16use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
17use crate::nbytes::NBytes;
18use crate::stats::{ArrayStats, StatsSetRef};
19use crate::stream::{ArrayStream, ArrayStreamAdapter};
20use crate::validity::Validity;
21use crate::vtable::{EncodingVTable, VTableRef};
22use crate::{
23    Array, ArrayImpl, ArrayRef, ArrayStatisticsImpl, EmptyMetadata, Encoding, EncodingId, IntoArray,
24};
25
26mod canonical;
27mod compute;
28mod serde;
29mod stats;
30mod variants;
31
32#[derive(Clone, Debug)]
33pub struct ChunkedArray {
34    dtype: DType,
35    len: usize,
36    chunk_offsets: Buffer<u64>,
37    chunks: Vec<ArrayRef>,
38    stats_set: ArrayStats,
39}
40
41pub struct ChunkedEncoding;
42impl Encoding for ChunkedEncoding {
43    type Array = ChunkedArray;
44    type Metadata = EmptyMetadata;
45}
46
47impl EncodingVTable for ChunkedEncoding {
48    fn id(&self) -> EncodingId {
49        EncodingId::new_ref("vortex.chunked")
50    }
51}
52
53impl ChunkedArray {
54    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
55        for chunk in &chunks {
56            if chunk.dtype() != &dtype {
57                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
58            }
59        }
60
61        Ok(Self::new_unchecked(chunks, dtype))
62    }
63
64    pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
65        let nchunks = chunks.len();
66
67        let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
68        unsafe { chunk_offsets.push_unchecked(0) }
69        let mut curr_offset = 0;
70        for c in &chunks {
71            curr_offset += c.len() as u64;
72            unsafe { chunk_offsets.push_unchecked(curr_offset) }
73        }
74        assert_eq!(chunk_offsets.len(), nchunks + 1);
75
76        Self {
77            dtype,
78            len: curr_offset.try_into().vortex_unwrap(),
79            chunk_offsets: chunk_offsets.freeze(),
80            chunks,
81            stats_set: Default::default(),
82        }
83    }
84
85    // TODO(ngates): remove result
86    #[inline]
87    pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
88        if idx >= self.nchunks() {
89            vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
90        }
91        Ok(&self.chunks[idx])
92    }
93
94    pub fn nchunks(&self) -> usize {
95        self.chunks.len()
96    }
97
98    #[inline]
99    pub fn chunk_offsets(&self) -> &[u64] {
100        &self.chunk_offsets
101    }
102
103    fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
104        assert!(index <= self.len(), "Index out of bounds of the array");
105        let index = index as u64;
106
107        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
108        // and take the last chunk (we subtract 1 since there's a leading 0)
109        let index_chunk = self
110            .chunk_offsets()
111            .search_sorted(&index, SearchSortedSide::Right)
112            .to_ends_index(self.nchunks() + 1)
113            .saturating_sub(1);
114        let chunk_start = self.chunk_offsets()[index_chunk];
115
116        let index_in_chunk =
117            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
118        (index_chunk, index_in_chunk)
119    }
120
121    pub fn chunks(&self) -> &[ArrayRef] {
122        &self.chunks
123    }
124
125    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
126        self.chunks().iter().filter(|c| !c.is_empty())
127    }
128
129    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
130        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
131    }
132
133    pub fn array_stream(&self) -> impl ArrayStream + '_ {
134        ArrayStreamAdapter::new(
135            self.dtype().clone(),
136            stream::iter(self.chunks().iter().cloned().map(Ok)),
137        )
138    }
139
140    pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult<Self> {
141        let mut new_chunks = Vec::new();
142        let mut chunks_to_combine = Vec::new();
143        let mut new_chunk_n_bytes = 0;
144        let mut new_chunk_n_elements = 0;
145        for chunk in self.chunks() {
146            let n_bytes = chunk.nbytes();
147            let n_elements = chunk.len();
148
149            if (new_chunk_n_bytes + n_bytes > target_bytesize
150                || new_chunk_n_elements + n_elements > target_rowsize)
151                && !chunks_to_combine.is_empty()
152            {
153                new_chunks.push(
154                    ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
155                        .to_canonical()?
156                        .into_array(),
157                );
158
159                new_chunk_n_bytes = 0;
160                new_chunk_n_elements = 0;
161                chunks_to_combine = Vec::new();
162            }
163
164            if n_bytes > target_bytesize || n_elements > target_rowsize {
165                new_chunks.push(chunk.clone());
166            } else {
167                new_chunk_n_bytes += n_bytes;
168                new_chunk_n_elements += n_elements;
169                chunks_to_combine.push(chunk.clone());
170            }
171        }
172
173        if !chunks_to_combine.is_empty() {
174            new_chunks.push(
175                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
176                    .to_canonical()?
177                    .into_array(),
178            );
179        }
180
181        Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
182    }
183}
184
185impl FromIterator<ArrayRef> for ChunkedArray {
186    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
187        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
188        let dtype = chunks
189            .first()
190            .map(|c| c.dtype().clone())
191            .vortex_expect("Cannot infer DType from an empty iterator");
192        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
193    }
194}
195
196impl ArrayImpl for ChunkedArray {
197    type Encoding = ChunkedEncoding;
198
199    fn _len(&self) -> usize {
200        self.len
201    }
202
203    fn _dtype(&self) -> &DType {
204        &self.dtype
205    }
206
207    fn _vtable(&self) -> VTableRef {
208        VTableRef::new_ref(&ChunkedEncoding)
209    }
210}
211
212impl ArrayStatisticsImpl for ChunkedArray {
213    fn _stats_ref(&self) -> StatsSetRef<'_> {
214        self.stats_set.to_ref(self)
215    }
216}
217
218impl ArrayValidityImpl for ChunkedArray {
219    fn _is_valid(&self, index: usize) -> VortexResult<bool> {
220        if !self.dtype.is_nullable() {
221            return Ok(true);
222        }
223        let (chunk, offset_in_chunk) = self.find_chunk_idx(index);
224        self.chunk(chunk)?.is_valid(offset_in_chunk)
225    }
226
227    fn _all_valid(&self) -> VortexResult<bool> {
228        if !self.dtype().is_nullable() {
229            return Ok(true);
230        }
231        for chunk in self.chunks() {
232            if !chunk.all_valid()? {
233                return Ok(false);
234            }
235        }
236        Ok(true)
237    }
238
239    fn _all_invalid(&self) -> VortexResult<bool> {
240        if !self.dtype().is_nullable() {
241            return Ok(false);
242        }
243        for chunk in self.chunks() {
244            if !chunk.all_invalid()? {
245                return Ok(false);
246            }
247        }
248        Ok(true)
249    }
250
251    fn _validity_mask(&self) -> VortexResult<Mask> {
252        // TODO(ngates): implement FromIterator<LogicalValidity> for LogicalValidity.
253        // TODO(ngates): or use a boolean array builder?
254        let validity: Validity = self
255            .chunks()
256            .iter()
257            .map(|a| a.validity_mask())
258            .try_collect()?;
259        validity.to_logical(self.len())
260    }
261}
262
263#[cfg(test)]
264mod test {
265    use vortex_buffer::buffer;
266    use vortex_dtype::{DType, Nullability, PType};
267    use vortex_error::VortexResult;
268
269    use crate::array::Array;
270    use crate::arrays::chunked::ChunkedArray;
271    use crate::compute::test_harness::test_binary_numeric;
272    use crate::compute::{scalar_at, sub_scalar, try_cast};
273    use crate::{ArrayExt, IntoArray, ToCanonical, assert_arrays_eq};
274
275    fn chunked_array() -> ChunkedArray {
276        ChunkedArray::try_new(
277            vec![
278                buffer![1u64, 2, 3].into_array(),
279                buffer![4u64, 5, 6].into_array(),
280                buffer![7u64, 8, 9].into_array(),
281            ],
282            DType::Primitive(PType::U64, Nullability::NonNullable),
283        )
284        .unwrap()
285    }
286
287    #[test]
288    fn test_scalar_subtract() {
289        let chunked = chunked_array().into_array();
290        let to_subtract = 1u64;
291        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
292
293        let chunked = array.as_::<ChunkedArray>();
294        let chunks_out = chunked.chunks();
295
296        let results = chunks_out[0]
297            .to_primitive()
298            .unwrap()
299            .as_slice::<u64>()
300            .to_vec();
301        assert_eq!(results, &[0u64, 1, 2]);
302        let results = chunks_out[1]
303            .to_primitive()
304            .unwrap()
305            .as_slice::<u64>()
306            .to_vec();
307        assert_eq!(results, &[3u64, 4, 5]);
308        let results = chunks_out[2]
309            .to_primitive()
310            .unwrap()
311            .as_slice::<u64>()
312            .to_vec();
313        assert_eq!(results, &[6u64, 7, 8]);
314    }
315
316    #[test]
317    fn test_rechunk_one_chunk() {
318        let chunked = ChunkedArray::try_new(
319            vec![buffer![0u64].into_array()],
320            DType::Primitive(PType::U64, Nullability::NonNullable),
321        )
322        .unwrap();
323
324        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
325
326        assert_arrays_eq!(chunked, rechunked);
327    }
328
329    #[test]
330    fn test_rechunk_two_chunks() {
331        let chunked = ChunkedArray::try_new(
332            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
333            DType::Primitive(PType::U64, Nullability::NonNullable),
334        )
335        .unwrap();
336
337        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
338
339        assert_eq!(rechunked.nchunks(), 1);
340        assert_arrays_eq!(chunked, rechunked);
341    }
342
343    #[test]
344    fn test_rechunk_tiny_target_chunks() {
345        let chunked = ChunkedArray::try_new(
346            vec![
347                buffer![0u64, 1, 2, 3].into_array(),
348                buffer![4u64, 5].into_array(),
349            ],
350            DType::Primitive(PType::U64, Nullability::NonNullable),
351        )
352        .unwrap();
353
354        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
355
356        assert_eq!(rechunked.nchunks(), 2);
357        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
358        assert_arrays_eq!(chunked, rechunked);
359    }
360
361    #[test]
362    fn test_rechunk_with_too_big_chunk() {
363        let chunked = ChunkedArray::try_new(
364            vec![
365                buffer![0u64, 1, 2].into_array(),
366                buffer![42_u64; 6].into_array(),
367                buffer![4u64, 5].into_array(),
368                buffer![6u64, 7].into_array(),
369                buffer![8u64, 9].into_array(),
370            ],
371            DType::Primitive(PType::U64, Nullability::NonNullable),
372        )
373        .unwrap();
374
375        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
376        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
377
378        assert_eq!(rechunked.nchunks(), 4);
379        assert_arrays_eq!(chunked, rechunked);
380    }
381
382    #[test]
383    fn test_chunked_binary_numeric() {
384        let array = chunked_array();
385        // The tests test both X - 1 and 1 - X, so we need signed values
386        let signed_dtype = DType::from(PType::try_from(array.dtype()).unwrap().to_signed());
387        let array = try_cast(&array, &signed_dtype).unwrap();
388        test_binary_numeric::<u64>(array)
389    }
390}