vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9
10use futures::stream;
11use vortex_buffer::Buffer;
12use vortex_buffer::BufferMut;
13use vortex_dtype::DType;
14use vortex_error::VortexExpect as _;
15use vortex_error::VortexResult;
16use vortex_error::VortexUnwrap;
17use vortex_error::vortex_bail;
18
19use crate::Array;
20use crate::ArrayRef;
21use crate::IntoArray;
22use crate::arrays::PrimitiveArray;
23use crate::iter::ArrayIterator;
24use crate::iter::ArrayIteratorAdapter;
25use crate::search_sorted::SearchSorted;
26use crate::search_sorted::SearchSortedSide;
27use crate::stats::ArrayStats;
28use crate::stream::ArrayStream;
29use crate::stream::ArrayStreamAdapter;
30use crate::validity::Validity;
31
32#[derive(Clone, Debug)]
33pub struct ChunkedArray {
34 pub(super) dtype: DType,
35 pub(super) len: usize,
36 pub(super) chunk_offsets: PrimitiveArray,
37 pub(super) chunks: Vec<ArrayRef>,
38 pub(super) stats_set: ArrayStats,
39}
40
41impl ChunkedArray {
42 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
51 Self::validate(&chunks, &dtype)?;
52
53 unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
55 }
56
57 pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
66 #[cfg(debug_assertions)]
67 Self::validate(&chunks, &dtype)
68 .vortex_expect("[Debug Assertion]: Invalid `ChunkedArray` parameters");
69
70 let nchunks = chunks.len();
71
72 let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(nchunks + 1);
73 unsafe { chunk_offsets_buf.push_unchecked(0) }
75 let mut curr_offset = 0;
76 for c in &chunks {
77 curr_offset += c.len() as u64;
78 unsafe { chunk_offsets_buf.push_unchecked(curr_offset) }
80 }
81
82 let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable);
83
84 Self {
85 dtype,
86 len: curr_offset.try_into().vortex_unwrap(),
87 chunk_offsets,
88 chunks,
89 stats_set: Default::default(),
90 }
91 }
92
93 pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
97 for chunk in chunks {
98 if chunk.dtype() != dtype {
99 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
100 }
101 }
102
103 Ok(())
104 }
105
106 #[inline]
107 pub fn chunk(&self, idx: usize) -> &ArrayRef {
108 assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
109
110 &self.chunks[idx]
111 }
112
113 pub fn nchunks(&self) -> usize {
114 self.chunks.len()
115 }
116
117 #[inline]
118 pub fn chunk_offsets(&self) -> Buffer<u64> {
119 self.chunk_offsets.buffer()
120 }
121
122 pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
123 assert!(index <= self.len(), "Index out of bounds of the array");
124 let index = index as u64;
125
126 let index_chunk = self
129 .chunk_offsets()
130 .search_sorted(&index, SearchSortedSide::Right)
131 .to_ends_index(self.nchunks() + 1)
132 .saturating_sub(1);
133 let chunk_start = self.chunk_offsets()[index_chunk];
134
135 let index_in_chunk =
136 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
137 (index_chunk, index_in_chunk)
138 }
139
140 pub fn chunks(&self) -> &[ArrayRef] {
141 &self.chunks
142 }
143
144 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
145 self.chunks().iter().filter(|c| !c.is_empty())
146 }
147
148 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
149 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
150 }
151
152 pub fn array_stream(&self) -> impl ArrayStream + '_ {
153 ArrayStreamAdapter::new(
154 self.dtype().clone(),
155 stream::iter(self.chunks().iter().cloned().map(Ok)),
156 )
157 }
158
159 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
160 let mut new_chunks = Vec::new();
161 let mut chunks_to_combine = Vec::new();
162 let mut new_chunk_n_bytes = 0;
163 let mut new_chunk_n_elements = 0;
164 for chunk in self.chunks() {
165 let n_bytes = chunk.nbytes();
166 let n_elements = chunk.len();
167
168 if (new_chunk_n_bytes + n_bytes > target_bytesize
169 || new_chunk_n_elements + n_elements > target_rowsize)
170 && !chunks_to_combine.is_empty()
171 {
172 new_chunks.push(
173 unsafe {
176 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
177 .to_canonical()
178 .into_array()
179 },
180 );
181
182 new_chunk_n_bytes = 0;
183 new_chunk_n_elements = 0;
184 chunks_to_combine = Vec::new();
185 }
186
187 if n_bytes > target_bytesize || n_elements > target_rowsize {
188 new_chunks.push(chunk.clone());
189 } else {
190 new_chunk_n_bytes += n_bytes;
191 new_chunk_n_elements += n_elements;
192 chunks_to_combine.push(chunk.clone());
193 }
194 }
195
196 if !chunks_to_combine.is_empty() {
197 new_chunks.push(unsafe {
198 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
201 .to_canonical()
202 .into_array()
203 });
204 }
205
206 unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
209 }
210}
211
212impl FromIterator<ArrayRef> for ChunkedArray {
213 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
214 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
215 let dtype = chunks
216 .first()
217 .map(|c| c.dtype().clone())
218 .vortex_expect("Cannot infer DType from an empty iterator");
219 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
220 }
221}
222
223#[cfg(test)]
224mod test {
225 use vortex_buffer::buffer;
226 use vortex_dtype::DType;
227 use vortex_dtype::Nullability;
228 use vortex_dtype::PType;
229 use vortex_error::VortexResult;
230
231 use crate::IntoArray;
232 use crate::ToCanonical;
233 use crate::array::Array;
234 use crate::arrays::ChunkedVTable;
235 use crate::arrays::PrimitiveArray;
236 use crate::arrays::chunked::ChunkedArray;
237 use crate::assert_arrays_eq;
238 use crate::compute::sub_scalar;
239 use crate::validity::Validity;
240
241 fn chunked_array() -> ChunkedArray {
242 ChunkedArray::try_new(
243 vec![
244 buffer![1u64, 2, 3].into_array(),
245 buffer![4u64, 5, 6].into_array(),
246 buffer![7u64, 8, 9].into_array(),
247 ],
248 DType::Primitive(PType::U64, Nullability::NonNullable),
249 )
250 .unwrap()
251 }
252
253 #[test]
254 fn test_scalar_subtract() {
255 let chunked = chunked_array().into_array();
256 let to_subtract = 1u64;
257 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
258
259 let chunked = array.as_::<ChunkedVTable>();
260 let chunks_out = chunked.chunks();
261
262 let results = chunks_out[0].to_primitive().as_slice::<u64>().to_vec();
263 assert_eq!(results, &[0u64, 1, 2]);
264 let results = chunks_out[1].to_primitive().as_slice::<u64>().to_vec();
265 assert_eq!(results, &[3u64, 4, 5]);
266 let results = chunks_out[2].to_primitive().as_slice::<u64>().to_vec();
267 assert_eq!(results, &[6u64, 7, 8]);
268 }
269
270 #[test]
271 fn test_rechunk_one_chunk() {
272 let chunked = ChunkedArray::try_new(
273 vec![buffer![0u64].into_array()],
274 DType::Primitive(PType::U64, Nullability::NonNullable),
275 )
276 .unwrap();
277
278 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
279
280 assert_arrays_eq!(chunked, rechunked);
281 }
282
283 #[test]
284 fn test_rechunk_two_chunks() {
285 let chunked = ChunkedArray::try_new(
286 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
287 DType::Primitive(PType::U64, Nullability::NonNullable),
288 )
289 .unwrap();
290
291 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
292
293 assert_eq!(rechunked.nchunks(), 1);
294 assert_arrays_eq!(chunked, rechunked);
295 }
296
297 #[test]
298 fn test_rechunk_tiny_target_chunks() {
299 let chunked = ChunkedArray::try_new(
300 vec![
301 buffer![0u64, 1, 2, 3].into_array(),
302 buffer![4u64, 5].into_array(),
303 ],
304 DType::Primitive(PType::U64, Nullability::NonNullable),
305 )
306 .unwrap();
307
308 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
309
310 assert_eq!(rechunked.nchunks(), 2);
311 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
312 assert_arrays_eq!(chunked, rechunked);
313 }
314
315 #[test]
316 fn test_rechunk_with_too_big_chunk() {
317 let chunked = ChunkedArray::try_new(
318 vec![
319 buffer![0u64, 1, 2].into_array(),
320 buffer![42_u64; 6].into_array(),
321 buffer![4u64, 5].into_array(),
322 buffer![6u64, 7].into_array(),
323 buffer![8u64, 9].into_array(),
324 ],
325 DType::Primitive(PType::U64, Nullability::NonNullable),
326 )
327 .unwrap();
328
329 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
330 assert_eq!(rechunked.nchunks(), 4);
333 assert_arrays_eq!(chunked, rechunked);
334 }
335
336 #[test]
337 fn test_empty_chunks_all_valid() -> VortexResult<()> {
338 let chunks = vec![
340 PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
341 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
343 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), ];
345
346 let chunked =
347 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
348
349 assert!(chunked.all_valid());
351 assert!(!chunked.all_invalid());
352
353 Ok(())
354 }
355
356 #[test]
357 fn test_empty_chunks_all_invalid() -> VortexResult<()> {
358 let chunks = vec![
360 PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
361 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
363 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
365
366 let chunked =
367 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
368
369 assert!(!chunked.all_valid());
371 assert!(chunked.all_invalid());
372
373 Ok(())
374 }
375
376 #[test]
377 fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
378 let chunks = vec![
380 PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
381 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
383 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
385
386 let chunked =
387 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
388
389 assert!(!chunked.all_valid());
391 assert!(!chunked.all_invalid());
392
393 Ok(())
394 }
395}