vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9
10use futures_util::stream;
11use vortex_buffer::{Buffer, BufferMut};
12use vortex_dtype::DType;
13use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
14use vortex_mask::Mask;
15
16use crate::arrays::ChunkedVTable;
17use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
18use crate::search_sorted::{SearchSorted, SearchSortedSide};
19use crate::stats::{ArrayStats, StatsSetRef};
20use crate::stream::{ArrayStream, ArrayStreamAdapter};
21use crate::vtable::{ArrayVTable, ValidityVTable};
22use crate::{Array, ArrayRef, IntoArray};
23
24#[derive(Clone, Debug)]
25pub struct ChunkedArray {
26 dtype: DType,
27 len: usize,
28 chunk_offsets: Buffer<u64>,
29 chunks: Vec<ArrayRef>,
30 stats_set: ArrayStats,
31}
32
33impl ChunkedArray {
34 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
35 for chunk in &chunks {
36 if chunk.dtype() != &dtype {
37 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
38 }
39 }
40
41 unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
43 }
44
45 pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
53 let nchunks = chunks.len();
54
55 let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
56 unsafe { chunk_offsets.push_unchecked(0) }
58 let mut curr_offset = 0;
59 for c in &chunks {
60 curr_offset += c.len() as u64;
61 unsafe { chunk_offsets.push_unchecked(curr_offset) }
63 }
64
65 Self {
66 dtype,
67 len: curr_offset.try_into().vortex_unwrap(),
68 chunk_offsets: chunk_offsets.freeze(),
69 chunks,
70 stats_set: Default::default(),
71 }
72 }
73
74 #[inline]
75 pub fn chunk(&self, idx: usize) -> &ArrayRef {
76 assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
77
78 &self.chunks[idx]
79 }
80
81 pub fn nchunks(&self) -> usize {
82 self.chunks.len()
83 }
84
85 #[inline]
86 pub fn chunk_offsets(&self) -> &Buffer<u64> {
87 &self.chunk_offsets
88 }
89
90 pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
91 assert!(index <= self.len(), "Index out of bounds of the array");
92 let index = index as u64;
93
94 let index_chunk = self
97 .chunk_offsets()
98 .search_sorted(&index, SearchSortedSide::Right)
99 .to_ends_index(self.nchunks() + 1)
100 .saturating_sub(1);
101 let chunk_start = self.chunk_offsets()[index_chunk];
102
103 let index_in_chunk =
104 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
105 (index_chunk, index_in_chunk)
106 }
107
108 pub fn chunks(&self) -> &[ArrayRef] {
109 &self.chunks
110 }
111
112 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
113 self.chunks().iter().filter(|c| !c.is_empty())
114 }
115
116 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
117 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
118 }
119
120 pub fn array_stream(&self) -> impl ArrayStream + '_ {
121 ArrayStreamAdapter::new(
122 self.dtype().clone(),
123 stream::iter(self.chunks().iter().cloned().map(Ok)),
124 )
125 }
126
127 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
128 let mut new_chunks = Vec::new();
129 let mut chunks_to_combine = Vec::new();
130 let mut new_chunk_n_bytes = 0;
131 let mut new_chunk_n_elements = 0;
132 for chunk in self.chunks() {
133 let n_bytes = chunk.nbytes();
134 let n_elements = chunk.len();
135
136 if (new_chunk_n_bytes + n_bytes > target_bytesize
137 || new_chunk_n_elements + n_elements > target_rowsize)
138 && !chunks_to_combine.is_empty()
139 {
140 new_chunks.push(
141 unsafe {
143 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
144 .to_canonical()?
145 .into_array()
146 },
147 );
148
149 new_chunk_n_bytes = 0;
150 new_chunk_n_elements = 0;
151 chunks_to_combine = Vec::new();
152 }
153
154 if n_bytes > target_bytesize || n_elements > target_rowsize {
155 new_chunks.push(chunk.clone());
156 } else {
157 new_chunk_n_bytes += n_bytes;
158 new_chunk_n_elements += n_elements;
159 chunks_to_combine.push(chunk.clone());
160 }
161 }
162
163 if !chunks_to_combine.is_empty() {
164 new_chunks.push(unsafe {
165 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
167 .to_canonical()?
168 .into_array()
169 });
170 }
171
172 unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
174 }
175}
176
177impl FromIterator<ArrayRef> for ChunkedArray {
178 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
179 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
180 let dtype = chunks
181 .first()
182 .map(|c| c.dtype().clone())
183 .vortex_expect("Cannot infer DType from an empty iterator");
184 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
185 }
186}
187
188impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
189 fn len(array: &ChunkedArray) -> usize {
190 array.len
191 }
192
193 fn dtype(array: &ChunkedArray) -> &DType {
194 &array.dtype
195 }
196
197 fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
198 array.stats_set.to_ref(array.as_ref())
199 }
200}
201
202impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
203 fn is_valid(array: &ChunkedArray, index: usize) -> bool {
204 if !array.dtype.is_nullable() {
205 return true;
206 }
207 let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
208 array.chunk(chunk).is_valid(offset_in_chunk)
209 }
210
211 fn all_valid(array: &ChunkedArray) -> bool {
212 if !array.dtype().is_nullable() {
213 return true;
214 }
215 for chunk in array.non_empty_chunks() {
216 if !chunk.all_valid() {
217 return false;
218 }
219 }
220 true
221 }
222
223 fn all_invalid(array: &ChunkedArray) -> bool {
224 if !array.dtype().is_nullable() {
225 return false;
226 }
227 for chunk in array.non_empty_chunks() {
228 if !chunk.all_invalid() {
229 return false;
230 }
231 }
232 true
233 }
234
235 fn validity_mask(array: &ChunkedArray) -> Mask {
236 array.chunks().iter().map(|a| a.validity_mask()).collect()
237 }
238}
239
240#[cfg(test)]
241mod test {
242 use vortex_buffer::buffer;
243 use vortex_dtype::{DType, Nullability, PType};
244 use vortex_error::VortexResult;
245
246 use crate::array::Array;
247 use crate::arrays::chunked::ChunkedArray;
248 use crate::arrays::{ChunkedVTable, PrimitiveArray};
249 use crate::compute::sub_scalar;
250 use crate::validity::Validity;
251 use crate::{IntoArray, ToCanonical, assert_arrays_eq};
252
253 fn chunked_array() -> ChunkedArray {
254 ChunkedArray::try_new(
255 vec![
256 buffer![1u64, 2, 3].into_array(),
257 buffer![4u64, 5, 6].into_array(),
258 buffer![7u64, 8, 9].into_array(),
259 ],
260 DType::Primitive(PType::U64, Nullability::NonNullable),
261 )
262 .unwrap()
263 }
264
265 #[test]
266 fn test_scalar_subtract() {
267 let chunked = chunked_array().into_array();
268 let to_subtract = 1u64;
269 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
270
271 let chunked = array.as_::<ChunkedVTable>();
272 let chunks_out = chunked.chunks();
273
274 let results = chunks_out[0]
275 .to_primitive()
276 .unwrap()
277 .as_slice::<u64>()
278 .to_vec();
279 assert_eq!(results, &[0u64, 1, 2]);
280 let results = chunks_out[1]
281 .to_primitive()
282 .unwrap()
283 .as_slice::<u64>()
284 .to_vec();
285 assert_eq!(results, &[3u64, 4, 5]);
286 let results = chunks_out[2]
287 .to_primitive()
288 .unwrap()
289 .as_slice::<u64>()
290 .to_vec();
291 assert_eq!(results, &[6u64, 7, 8]);
292 }
293
294 #[test]
295 fn test_rechunk_one_chunk() {
296 let chunked = ChunkedArray::try_new(
297 vec![buffer![0u64].into_array()],
298 DType::Primitive(PType::U64, Nullability::NonNullable),
299 )
300 .unwrap();
301
302 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
303
304 assert_arrays_eq!(chunked, rechunked);
305 }
306
307 #[test]
308 fn test_rechunk_two_chunks() {
309 let chunked = ChunkedArray::try_new(
310 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
311 DType::Primitive(PType::U64, Nullability::NonNullable),
312 )
313 .unwrap();
314
315 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
316
317 assert_eq!(rechunked.nchunks(), 1);
318 assert_arrays_eq!(chunked, rechunked);
319 }
320
321 #[test]
322 fn test_rechunk_tiny_target_chunks() {
323 let chunked = ChunkedArray::try_new(
324 vec![
325 buffer![0u64, 1, 2, 3].into_array(),
326 buffer![4u64, 5].into_array(),
327 ],
328 DType::Primitive(PType::U64, Nullability::NonNullable),
329 )
330 .unwrap();
331
332 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
333
334 assert_eq!(rechunked.nchunks(), 2);
335 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
336 assert_arrays_eq!(chunked, rechunked);
337 }
338
339 #[test]
340 fn test_rechunk_with_too_big_chunk() {
341 let chunked = ChunkedArray::try_new(
342 vec![
343 buffer![0u64, 1, 2].into_array(),
344 buffer![42_u64; 6].into_array(),
345 buffer![4u64, 5].into_array(),
346 buffer![6u64, 7].into_array(),
347 buffer![8u64, 9].into_array(),
348 ],
349 DType::Primitive(PType::U64, Nullability::NonNullable),
350 )
351 .unwrap();
352
353 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
354 assert_eq!(rechunked.nchunks(), 4);
357 assert_arrays_eq!(chunked, rechunked);
358 }
359
360 #[test]
361 fn test_empty_chunks_all_valid() -> VortexResult<()> {
362 let chunks = vec![
364 PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
365 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
367 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), ];
369
370 let chunked =
371 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
372
373 assert!(chunked.all_valid());
375 assert!(!chunked.all_invalid());
376
377 Ok(())
378 }
379
380 #[test]
381 fn test_empty_chunks_all_invalid() -> VortexResult<()> {
382 let chunks = vec![
384 PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
385 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
387 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
389
390 let chunked =
391 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
392
393 assert!(!chunked.all_valid());
395 assert!(chunked.all_invalid());
396
397 Ok(())
398 }
399
400 #[test]
401 fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
402 let chunks = vec![
404 PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
405 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
407 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
409
410 let chunked =
411 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
412
413 assert!(!chunked.all_valid());
415 assert!(!chunked.all_invalid());
416
417 Ok(())
418 }
419}