vortex_array/arrays/chunked/
mod.rs

1//! First-class chunked arrays.
2//!
3//! Vortex is a chunked array library that's able to
4
5use std::fmt::Debug;
6
7use futures_util::stream;
8use itertools::Itertools;
9use vortex_buffer::{Buffer, BufferMut};
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
12use vortex_mask::Mask;
13
14use crate::array::ArrayValidityImpl;
15use crate::compute::{ComputeFn, InvocationArgs, Output, SearchSorted, SearchSortedSide};
16use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
17use crate::nbytes::NBytes;
18use crate::stats::{ArrayStats, StatsSetRef};
19use crate::stream::{ArrayStream, ArrayStreamAdapter};
20use crate::vtable::VTableRef;
21use crate::{Array, ArrayImpl, ArrayRef, ArrayStatisticsImpl, EmptyMetadata, Encoding, IntoArray};
22
23mod canonical;
24mod compute;
25mod serde;
26mod variants;
27
28#[derive(Clone, Debug)]
29pub struct ChunkedArray {
30    dtype: DType,
31    len: usize,
32    chunk_offsets: Buffer<u64>,
33    chunks: Vec<ArrayRef>,
34    stats_set: ArrayStats,
35}
36
37#[derive(Debug)]
38pub struct ChunkedEncoding;
39impl Encoding for ChunkedEncoding {
40    type Array = ChunkedArray;
41    type Metadata = EmptyMetadata;
42}
43
44impl ChunkedArray {
45    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
46        for chunk in &chunks {
47            if chunk.dtype() != &dtype {
48                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
49            }
50        }
51
52        Ok(Self::new_unchecked(chunks, dtype))
53    }
54
55    pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
56        let nchunks = chunks.len();
57
58        let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
59        unsafe { chunk_offsets.push_unchecked(0) }
60        let mut curr_offset = 0;
61        for c in &chunks {
62            curr_offset += c.len() as u64;
63            unsafe { chunk_offsets.push_unchecked(curr_offset) }
64        }
65        assert_eq!(chunk_offsets.len(), nchunks + 1);
66
67        Self {
68            dtype,
69            len: curr_offset.try_into().vortex_unwrap(),
70            chunk_offsets: chunk_offsets.freeze(),
71            chunks,
72            stats_set: Default::default(),
73        }
74    }
75
76    // TODO(ngates): remove result
77    #[inline]
78    pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
79        if idx >= self.nchunks() {
80            vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
81        }
82        Ok(&self.chunks[idx])
83    }
84
85    pub fn nchunks(&self) -> usize {
86        self.chunks.len()
87    }
88
89    #[inline]
90    pub fn chunk_offsets(&self) -> &[u64] {
91        &self.chunk_offsets
92    }
93
94    fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
95        assert!(index <= self.len(), "Index out of bounds of the array");
96        let index = index as u64;
97
98        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
99        // and take the last chunk (we subtract 1 since there's a leading 0)
100        let index_chunk = self
101            .chunk_offsets()
102            .search_sorted(&index, SearchSortedSide::Right)
103            .to_ends_index(self.nchunks() + 1)
104            .saturating_sub(1);
105        let chunk_start = self.chunk_offsets()[index_chunk];
106
107        let index_in_chunk =
108            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
109        (index_chunk, index_in_chunk)
110    }
111
112    pub fn chunks(&self) -> &[ArrayRef] {
113        &self.chunks
114    }
115
116    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
117        self.chunks().iter().filter(|c| !c.is_empty())
118    }
119
120    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
121        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
122    }
123
124    pub fn array_stream(&self) -> impl ArrayStream + '_ {
125        ArrayStreamAdapter::new(
126            self.dtype().clone(),
127            stream::iter(self.chunks().iter().cloned().map(Ok)),
128        )
129    }
130
131    pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult<Self> {
132        let mut new_chunks = Vec::new();
133        let mut chunks_to_combine = Vec::new();
134        let mut new_chunk_n_bytes = 0;
135        let mut new_chunk_n_elements = 0;
136        for chunk in self.chunks() {
137            let n_bytes = chunk.nbytes();
138            let n_elements = chunk.len();
139
140            if (new_chunk_n_bytes + n_bytes > target_bytesize
141                || new_chunk_n_elements + n_elements > target_rowsize)
142                && !chunks_to_combine.is_empty()
143            {
144                new_chunks.push(
145                    ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
146                        .to_canonical()?
147                        .into_array(),
148                );
149
150                new_chunk_n_bytes = 0;
151                new_chunk_n_elements = 0;
152                chunks_to_combine = Vec::new();
153            }
154
155            if n_bytes > target_bytesize || n_elements > target_rowsize {
156                new_chunks.push(chunk.clone());
157            } else {
158                new_chunk_n_bytes += n_bytes;
159                new_chunk_n_elements += n_elements;
160                chunks_to_combine.push(chunk.clone());
161            }
162        }
163
164        if !chunks_to_combine.is_empty() {
165            new_chunks.push(
166                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
167                    .to_canonical()?
168                    .into_array(),
169            );
170        }
171
172        Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
173    }
174}
175
176impl FromIterator<ArrayRef> for ChunkedArray {
177    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
178        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
179        let dtype = chunks
180            .first()
181            .map(|c| c.dtype().clone())
182            .vortex_expect("Cannot infer DType from an empty iterator");
183        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
184    }
185}
186
187impl ArrayImpl for ChunkedArray {
188    type Encoding = ChunkedEncoding;
189
190    fn _len(&self) -> usize {
191        self.len
192    }
193
194    fn _dtype(&self) -> &DType {
195        &self.dtype
196    }
197
198    fn _vtable(&self) -> VTableRef {
199        VTableRef::new_ref(&ChunkedEncoding)
200    }
201
202    fn _with_children(&self, children: &[ArrayRef]) -> VortexResult<Self> {
203        Ok(ChunkedArray::new_unchecked(
204            // We skip the first child as it contains the offsets buffer.
205            children[1..].to_vec(),
206            self.dtype.clone(),
207        ))
208    }
209
210    fn _invoke(
211        &self,
212        compute_fn: &ComputeFn,
213        args: &InvocationArgs,
214    ) -> VortexResult<Option<Output>> {
215        if compute_fn.is_elementwise() {
216            return self.invoke_elementwise(compute_fn, args);
217        }
218        Ok(None)
219    }
220}
221
222impl ArrayStatisticsImpl for ChunkedArray {
223    fn _stats_ref(&self) -> StatsSetRef<'_> {
224        self.stats_set.to_ref(self)
225    }
226}
227
228impl ArrayValidityImpl for ChunkedArray {
229    fn _is_valid(&self, index: usize) -> VortexResult<bool> {
230        if !self.dtype.is_nullable() {
231            return Ok(true);
232        }
233        let (chunk, offset_in_chunk) = self.find_chunk_idx(index);
234        self.chunk(chunk)?.is_valid(offset_in_chunk)
235    }
236
237    fn _all_valid(&self) -> VortexResult<bool> {
238        if !self.dtype().is_nullable() {
239            return Ok(true);
240        }
241        for chunk in self.chunks() {
242            if !chunk.all_valid()? {
243                return Ok(false);
244            }
245        }
246        Ok(true)
247    }
248
249    fn _all_invalid(&self) -> VortexResult<bool> {
250        if !self.dtype().is_nullable() {
251            return Ok(false);
252        }
253        for chunk in self.chunks() {
254            if !chunk.all_invalid()? {
255                return Ok(false);
256            }
257        }
258        Ok(true)
259    }
260
261    fn _validity_mask(&self) -> VortexResult<Mask> {
262        self.chunks()
263            .iter()
264            .map(|a| a.validity_mask())
265            .try_collect()
266    }
267}
268
269#[cfg(test)]
270mod test {
271    use vortex_buffer::buffer;
272    use vortex_dtype::{DType, Nullability, PType};
273    use vortex_error::VortexResult;
274
275    use crate::array::Array;
276    use crate::arrays::chunked::ChunkedArray;
277    use crate::compute::conformance::binary_numeric::test_numeric;
278    use crate::compute::{cast, scalar_at, sub_scalar};
279    use crate::{ArrayExt, IntoArray, ToCanonical, assert_arrays_eq};
280
281    fn chunked_array() -> ChunkedArray {
282        ChunkedArray::try_new(
283            vec![
284                buffer![1u64, 2, 3].into_array(),
285                buffer![4u64, 5, 6].into_array(),
286                buffer![7u64, 8, 9].into_array(),
287            ],
288            DType::Primitive(PType::U64, Nullability::NonNullable),
289        )
290        .unwrap()
291    }
292
293    #[test]
294    fn test_scalar_subtract() {
295        let chunked = chunked_array().into_array();
296        let to_subtract = 1u64;
297        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
298
299        let chunked = array.as_::<ChunkedArray>();
300        let chunks_out = chunked.chunks();
301
302        let results = chunks_out[0]
303            .to_primitive()
304            .unwrap()
305            .as_slice::<u64>()
306            .to_vec();
307        assert_eq!(results, &[0u64, 1, 2]);
308        let results = chunks_out[1]
309            .to_primitive()
310            .unwrap()
311            .as_slice::<u64>()
312            .to_vec();
313        assert_eq!(results, &[3u64, 4, 5]);
314        let results = chunks_out[2]
315            .to_primitive()
316            .unwrap()
317            .as_slice::<u64>()
318            .to_vec();
319        assert_eq!(results, &[6u64, 7, 8]);
320    }
321
322    #[test]
323    fn test_rechunk_one_chunk() {
324        let chunked = ChunkedArray::try_new(
325            vec![buffer![0u64].into_array()],
326            DType::Primitive(PType::U64, Nullability::NonNullable),
327        )
328        .unwrap();
329
330        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
331
332        assert_arrays_eq!(chunked, rechunked);
333    }
334
335    #[test]
336    fn test_rechunk_two_chunks() {
337        let chunked = ChunkedArray::try_new(
338            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
339            DType::Primitive(PType::U64, Nullability::NonNullable),
340        )
341        .unwrap();
342
343        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
344
345        assert_eq!(rechunked.nchunks(), 1);
346        assert_arrays_eq!(chunked, rechunked);
347    }
348
349    #[test]
350    fn test_rechunk_tiny_target_chunks() {
351        let chunked = ChunkedArray::try_new(
352            vec![
353                buffer![0u64, 1, 2, 3].into_array(),
354                buffer![4u64, 5].into_array(),
355            ],
356            DType::Primitive(PType::U64, Nullability::NonNullable),
357        )
358        .unwrap();
359
360        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
361
362        assert_eq!(rechunked.nchunks(), 2);
363        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
364        assert_arrays_eq!(chunked, rechunked);
365    }
366
367    #[test]
368    fn test_rechunk_with_too_big_chunk() {
369        let chunked = ChunkedArray::try_new(
370            vec![
371                buffer![0u64, 1, 2].into_array(),
372                buffer![42_u64; 6].into_array(),
373                buffer![4u64, 5].into_array(),
374                buffer![6u64, 7].into_array(),
375                buffer![8u64, 9].into_array(),
376            ],
377            DType::Primitive(PType::U64, Nullability::NonNullable),
378        )
379        .unwrap();
380
381        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
382        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
383
384        assert_eq!(rechunked.nchunks(), 4);
385        assert_arrays_eq!(chunked, rechunked);
386    }
387
388    #[test]
389    fn test_chunked_binary_numeric() {
390        let array = chunked_array();
391        // The tests test both X - 1 and 1 - X, so we need signed values
392        let signed_dtype = DType::from(PType::try_from(array.dtype()).unwrap().to_signed());
393        let array = cast(&array, &signed_dtype).unwrap();
394        test_numeric::<u64>(array)
395    }
396}