vortex_array/arrays/chunked/
mod.rs

1//! First-class chunked arrays.
2//!
3//! Vortex is a chunked array library that's able to
4
5use std::fmt::Debug;
6
7use futures_util::stream;
8use itertools::Itertools;
9use vortex_buffer::{Buffer, BufferMut};
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
12use vortex_mask::Mask;
13
14use crate::array::ArrayValidityImpl;
15use crate::compute::{SearchSorted, SearchSortedSide};
16use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
17use crate::nbytes::NBytes;
18use crate::stats::{ArrayStats, StatsSetRef};
19use crate::stream::{ArrayStream, ArrayStreamAdapter};
20use crate::vtable::VTableRef;
21use crate::{Array, ArrayImpl, ArrayRef, ArrayStatisticsImpl, EmptyMetadata, Encoding, IntoArray};
22
23mod canonical;
24mod compute;
25mod serde;
26mod variants;
27
28#[derive(Clone, Debug)]
29pub struct ChunkedArray {
30    dtype: DType,
31    len: usize,
32    chunk_offsets: Buffer<u64>,
33    chunks: Vec<ArrayRef>,
34    stats_set: ArrayStats,
35}
36
37pub struct ChunkedEncoding;
38impl Encoding for ChunkedEncoding {
39    type Array = ChunkedArray;
40    type Metadata = EmptyMetadata;
41}
42
43impl ChunkedArray {
44    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
45        for chunk in &chunks {
46            if chunk.dtype() != &dtype {
47                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
48            }
49        }
50
51        Ok(Self::new_unchecked(chunks, dtype))
52    }
53
54    pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
55        let nchunks = chunks.len();
56
57        let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
58        unsafe { chunk_offsets.push_unchecked(0) }
59        let mut curr_offset = 0;
60        for c in &chunks {
61            curr_offset += c.len() as u64;
62            unsafe { chunk_offsets.push_unchecked(curr_offset) }
63        }
64        assert_eq!(chunk_offsets.len(), nchunks + 1);
65
66        Self {
67            dtype,
68            len: curr_offset.try_into().vortex_unwrap(),
69            chunk_offsets: chunk_offsets.freeze(),
70            chunks,
71            stats_set: Default::default(),
72        }
73    }
74
75    // TODO(ngates): remove result
76    #[inline]
77    pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
78        if idx >= self.nchunks() {
79            vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
80        }
81        Ok(&self.chunks[idx])
82    }
83
84    pub fn nchunks(&self) -> usize {
85        self.chunks.len()
86    }
87
88    #[inline]
89    pub fn chunk_offsets(&self) -> &[u64] {
90        &self.chunk_offsets
91    }
92
93    fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
94        assert!(index <= self.len(), "Index out of bounds of the array");
95        let index = index as u64;
96
97        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
98        // and take the last chunk (we subtract 1 since there's a leading 0)
99        let index_chunk = self
100            .chunk_offsets()
101            .search_sorted(&index, SearchSortedSide::Right)
102            .to_ends_index(self.nchunks() + 1)
103            .saturating_sub(1);
104        let chunk_start = self.chunk_offsets()[index_chunk];
105
106        let index_in_chunk =
107            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
108        (index_chunk, index_in_chunk)
109    }
110
111    pub fn chunks(&self) -> &[ArrayRef] {
112        &self.chunks
113    }
114
115    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
116        self.chunks().iter().filter(|c| !c.is_empty())
117    }
118
119    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
120        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
121    }
122
123    pub fn array_stream(&self) -> impl ArrayStream + '_ {
124        ArrayStreamAdapter::new(
125            self.dtype().clone(),
126            stream::iter(self.chunks().iter().cloned().map(Ok)),
127        )
128    }
129
130    pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult<Self> {
131        let mut new_chunks = Vec::new();
132        let mut chunks_to_combine = Vec::new();
133        let mut new_chunk_n_bytes = 0;
134        let mut new_chunk_n_elements = 0;
135        for chunk in self.chunks() {
136            let n_bytes = chunk.nbytes();
137            let n_elements = chunk.len();
138
139            if (new_chunk_n_bytes + n_bytes > target_bytesize
140                || new_chunk_n_elements + n_elements > target_rowsize)
141                && !chunks_to_combine.is_empty()
142            {
143                new_chunks.push(
144                    ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
145                        .to_canonical()?
146                        .into_array(),
147                );
148
149                new_chunk_n_bytes = 0;
150                new_chunk_n_elements = 0;
151                chunks_to_combine = Vec::new();
152            }
153
154            if n_bytes > target_bytesize || n_elements > target_rowsize {
155                new_chunks.push(chunk.clone());
156            } else {
157                new_chunk_n_bytes += n_bytes;
158                new_chunk_n_elements += n_elements;
159                chunks_to_combine.push(chunk.clone());
160            }
161        }
162
163        if !chunks_to_combine.is_empty() {
164            new_chunks.push(
165                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
166                    .to_canonical()?
167                    .into_array(),
168            );
169        }
170
171        Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
172    }
173}
174
175impl FromIterator<ArrayRef> for ChunkedArray {
176    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
177        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
178        let dtype = chunks
179            .first()
180            .map(|c| c.dtype().clone())
181            .vortex_expect("Cannot infer DType from an empty iterator");
182        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
183    }
184}
185
186impl ArrayImpl for ChunkedArray {
187    type Encoding = ChunkedEncoding;
188
189    fn _len(&self) -> usize {
190        self.len
191    }
192
193    fn _dtype(&self) -> &DType {
194        &self.dtype
195    }
196
197    fn _vtable(&self) -> VTableRef {
198        VTableRef::new_ref(&ChunkedEncoding)
199    }
200
201    fn _with_children(&self, children: &[ArrayRef]) -> VortexResult<Self> {
202        Ok(ChunkedArray::new_unchecked(
203            // We skip the first child as it contains the offsets buffer.
204            children[1..].to_vec(),
205            self.dtype.clone(),
206        ))
207    }
208}
209
210impl ArrayStatisticsImpl for ChunkedArray {
211    fn _stats_ref(&self) -> StatsSetRef<'_> {
212        self.stats_set.to_ref(self)
213    }
214}
215
216impl ArrayValidityImpl for ChunkedArray {
217    fn _is_valid(&self, index: usize) -> VortexResult<bool> {
218        if !self.dtype.is_nullable() {
219            return Ok(true);
220        }
221        let (chunk, offset_in_chunk) = self.find_chunk_idx(index);
222        self.chunk(chunk)?.is_valid(offset_in_chunk)
223    }
224
225    fn _all_valid(&self) -> VortexResult<bool> {
226        if !self.dtype().is_nullable() {
227            return Ok(true);
228        }
229        for chunk in self.chunks() {
230            if !chunk.all_valid()? {
231                return Ok(false);
232            }
233        }
234        Ok(true)
235    }
236
237    fn _all_invalid(&self) -> VortexResult<bool> {
238        if !self.dtype().is_nullable() {
239            return Ok(false);
240        }
241        for chunk in self.chunks() {
242            if !chunk.all_invalid()? {
243                return Ok(false);
244            }
245        }
246        Ok(true)
247    }
248
249    fn _validity_mask(&self) -> VortexResult<Mask> {
250        self.chunks()
251            .iter()
252            .map(|a| a.validity_mask())
253            .try_collect()
254    }
255}
256
257#[cfg(test)]
258mod test {
259    use vortex_buffer::buffer;
260    use vortex_dtype::{DType, Nullability, PType};
261    use vortex_error::VortexResult;
262
263    use crate::array::Array;
264    use crate::arrays::chunked::ChunkedArray;
265    use crate::compute::conformance::binary_numeric::test_binary_numeric;
266    use crate::compute::{scalar_at, sub_scalar, try_cast};
267    use crate::{ArrayExt, IntoArray, ToCanonical, assert_arrays_eq};
268
269    fn chunked_array() -> ChunkedArray {
270        ChunkedArray::try_new(
271            vec![
272                buffer![1u64, 2, 3].into_array(),
273                buffer![4u64, 5, 6].into_array(),
274                buffer![7u64, 8, 9].into_array(),
275            ],
276            DType::Primitive(PType::U64, Nullability::NonNullable),
277        )
278        .unwrap()
279    }
280
281    #[test]
282    fn test_scalar_subtract() {
283        let chunked = chunked_array().into_array();
284        let to_subtract = 1u64;
285        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
286
287        let chunked = array.as_::<ChunkedArray>();
288        let chunks_out = chunked.chunks();
289
290        let results = chunks_out[0]
291            .to_primitive()
292            .unwrap()
293            .as_slice::<u64>()
294            .to_vec();
295        assert_eq!(results, &[0u64, 1, 2]);
296        let results = chunks_out[1]
297            .to_primitive()
298            .unwrap()
299            .as_slice::<u64>()
300            .to_vec();
301        assert_eq!(results, &[3u64, 4, 5]);
302        let results = chunks_out[2]
303            .to_primitive()
304            .unwrap()
305            .as_slice::<u64>()
306            .to_vec();
307        assert_eq!(results, &[6u64, 7, 8]);
308    }
309
310    #[test]
311    fn test_rechunk_one_chunk() {
312        let chunked = ChunkedArray::try_new(
313            vec![buffer![0u64].into_array()],
314            DType::Primitive(PType::U64, Nullability::NonNullable),
315        )
316        .unwrap();
317
318        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
319
320        assert_arrays_eq!(chunked, rechunked);
321    }
322
323    #[test]
324    fn test_rechunk_two_chunks() {
325        let chunked = ChunkedArray::try_new(
326            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
327            DType::Primitive(PType::U64, Nullability::NonNullable),
328        )
329        .unwrap();
330
331        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
332
333        assert_eq!(rechunked.nchunks(), 1);
334        assert_arrays_eq!(chunked, rechunked);
335    }
336
337    #[test]
338    fn test_rechunk_tiny_target_chunks() {
339        let chunked = ChunkedArray::try_new(
340            vec![
341                buffer![0u64, 1, 2, 3].into_array(),
342                buffer![4u64, 5].into_array(),
343            ],
344            DType::Primitive(PType::U64, Nullability::NonNullable),
345        )
346        .unwrap();
347
348        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
349
350        assert_eq!(rechunked.nchunks(), 2);
351        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
352        assert_arrays_eq!(chunked, rechunked);
353    }
354
355    #[test]
356    fn test_rechunk_with_too_big_chunk() {
357        let chunked = ChunkedArray::try_new(
358            vec![
359                buffer![0u64, 1, 2].into_array(),
360                buffer![42_u64; 6].into_array(),
361                buffer![4u64, 5].into_array(),
362                buffer![6u64, 7].into_array(),
363                buffer![8u64, 9].into_array(),
364            ],
365            DType::Primitive(PType::U64, Nullability::NonNullable),
366        )
367        .unwrap();
368
369        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
370        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
371
372        assert_eq!(rechunked.nchunks(), 4);
373        assert_arrays_eq!(chunked, rechunked);
374    }
375
376    #[test]
377    fn test_chunked_binary_numeric() {
378        let array = chunked_array();
379        // The tests test both X - 1 and 1 - X, so we need signed values
380        let signed_dtype = DType::from(PType::try_from(array.dtype()).unwrap().to_signed());
381        let array = try_cast(&array, &signed_dtype).unwrap();
382        test_binary_numeric::<u64>(array)
383    }
384}