vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9
10use futures::stream;
11use vortex_buffer::{Buffer, BufferMut};
12use vortex_dtype::DType;
13use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
14
15use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
16use crate::search_sorted::{SearchSorted, SearchSortedSide};
17use crate::stats::ArrayStats;
18use crate::stream::{ArrayStream, ArrayStreamAdapter};
19use crate::{Array, ArrayRef, IntoArray};
20
21#[derive(Clone, Debug)]
22pub struct ChunkedArray {
23 pub(super) dtype: DType,
24 pub(super) len: usize,
25 pub(super) chunk_offsets: Buffer<u64>,
26 pub(super) chunks: Vec<ArrayRef>,
27 pub(super) stats_set: ArrayStats,
28}
29
30impl ChunkedArray {
31 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
40 Self::validate(&chunks, &dtype)?;
41
42 unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
44 }
45
46 pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
55 #[cfg(debug_assertions)]
56 Self::validate(&chunks, &dtype)
57 .vortex_expect("[Debug Assertion]: Invalid `ChunkedArray` parameters");
58
59 let nchunks = chunks.len();
60
61 let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
62 unsafe { chunk_offsets.push_unchecked(0) }
64 let mut curr_offset = 0;
65 for c in &chunks {
66 curr_offset += c.len() as u64;
67 unsafe { chunk_offsets.push_unchecked(curr_offset) }
69 }
70
71 Self {
72 dtype,
73 len: curr_offset.try_into().vortex_unwrap(),
74 chunk_offsets: chunk_offsets.freeze(),
75 chunks,
76 stats_set: Default::default(),
77 }
78 }
79
80 pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
84 for chunk in chunks {
85 if chunk.dtype() != dtype {
86 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
87 }
88 }
89
90 Ok(())
91 }
92
93 #[inline]
94 pub fn chunk(&self, idx: usize) -> &ArrayRef {
95 assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
96
97 &self.chunks[idx]
98 }
99
100 pub fn nchunks(&self) -> usize {
101 self.chunks.len()
102 }
103
104 #[inline]
105 pub fn chunk_offsets(&self) -> &Buffer<u64> {
106 &self.chunk_offsets
107 }
108
109 pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
110 assert!(index <= self.len(), "Index out of bounds of the array");
111 let index = index as u64;
112
113 let index_chunk = self
116 .chunk_offsets()
117 .search_sorted(&index, SearchSortedSide::Right)
118 .to_ends_index(self.nchunks() + 1)
119 .saturating_sub(1);
120 let chunk_start = self.chunk_offsets()[index_chunk];
121
122 let index_in_chunk =
123 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
124 (index_chunk, index_in_chunk)
125 }
126
127 pub fn chunks(&self) -> &[ArrayRef] {
128 &self.chunks
129 }
130
131 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
132 self.chunks().iter().filter(|c| !c.is_empty())
133 }
134
135 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
136 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
137 }
138
139 pub fn array_stream(&self) -> impl ArrayStream + '_ {
140 ArrayStreamAdapter::new(
141 self.dtype().clone(),
142 stream::iter(self.chunks().iter().cloned().map(Ok)),
143 )
144 }
145
146 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
147 let mut new_chunks = Vec::new();
148 let mut chunks_to_combine = Vec::new();
149 let mut new_chunk_n_bytes = 0;
150 let mut new_chunk_n_elements = 0;
151 for chunk in self.chunks() {
152 let n_bytes = chunk.nbytes();
153 let n_elements = chunk.len();
154
155 if (new_chunk_n_bytes + n_bytes > target_bytesize
156 || new_chunk_n_elements + n_elements > target_rowsize)
157 && !chunks_to_combine.is_empty()
158 {
159 new_chunks.push(
160 unsafe {
163 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
164 .to_canonical()
165 .into_array()
166 },
167 );
168
169 new_chunk_n_bytes = 0;
170 new_chunk_n_elements = 0;
171 chunks_to_combine = Vec::new();
172 }
173
174 if n_bytes > target_bytesize || n_elements > target_rowsize {
175 new_chunks.push(chunk.clone());
176 } else {
177 new_chunk_n_bytes += n_bytes;
178 new_chunk_n_elements += n_elements;
179 chunks_to_combine.push(chunk.clone());
180 }
181 }
182
183 if !chunks_to_combine.is_empty() {
184 new_chunks.push(unsafe {
185 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
188 .to_canonical()
189 .into_array()
190 });
191 }
192
193 unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
196 }
197}
198
199impl FromIterator<ArrayRef> for ChunkedArray {
200 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
201 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
202 let dtype = chunks
203 .first()
204 .map(|c| c.dtype().clone())
205 .vortex_expect("Cannot infer DType from an empty iterator");
206 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
207 }
208}
209
210#[cfg(test)]
211mod test {
212 use vortex_buffer::buffer;
213 use vortex_dtype::{DType, Nullability, PType};
214 use vortex_error::VortexResult;
215
216 use crate::array::Array;
217 use crate::arrays::chunked::ChunkedArray;
218 use crate::arrays::{ChunkedVTable, PrimitiveArray};
219 use crate::compute::sub_scalar;
220 use crate::validity::Validity;
221 use crate::{IntoArray, ToCanonical, assert_arrays_eq};
222
223 fn chunked_array() -> ChunkedArray {
224 ChunkedArray::try_new(
225 vec![
226 buffer![1u64, 2, 3].into_array(),
227 buffer![4u64, 5, 6].into_array(),
228 buffer![7u64, 8, 9].into_array(),
229 ],
230 DType::Primitive(PType::U64, Nullability::NonNullable),
231 )
232 .unwrap()
233 }
234
235 #[test]
236 fn test_scalar_subtract() {
237 let chunked = chunked_array().into_array();
238 let to_subtract = 1u64;
239 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
240
241 let chunked = array.as_::<ChunkedVTable>();
242 let chunks_out = chunked.chunks();
243
244 let results = chunks_out[0].to_primitive().as_slice::<u64>().to_vec();
245 assert_eq!(results, &[0u64, 1, 2]);
246 let results = chunks_out[1].to_primitive().as_slice::<u64>().to_vec();
247 assert_eq!(results, &[3u64, 4, 5]);
248 let results = chunks_out[2].to_primitive().as_slice::<u64>().to_vec();
249 assert_eq!(results, &[6u64, 7, 8]);
250 }
251
252 #[test]
253 fn test_rechunk_one_chunk() {
254 let chunked = ChunkedArray::try_new(
255 vec![buffer![0u64].into_array()],
256 DType::Primitive(PType::U64, Nullability::NonNullable),
257 )
258 .unwrap();
259
260 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
261
262 assert_arrays_eq!(chunked, rechunked);
263 }
264
265 #[test]
266 fn test_rechunk_two_chunks() {
267 let chunked = ChunkedArray::try_new(
268 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
269 DType::Primitive(PType::U64, Nullability::NonNullable),
270 )
271 .unwrap();
272
273 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
274
275 assert_eq!(rechunked.nchunks(), 1);
276 assert_arrays_eq!(chunked, rechunked);
277 }
278
279 #[test]
280 fn test_rechunk_tiny_target_chunks() {
281 let chunked = ChunkedArray::try_new(
282 vec![
283 buffer![0u64, 1, 2, 3].into_array(),
284 buffer![4u64, 5].into_array(),
285 ],
286 DType::Primitive(PType::U64, Nullability::NonNullable),
287 )
288 .unwrap();
289
290 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
291
292 assert_eq!(rechunked.nchunks(), 2);
293 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
294 assert_arrays_eq!(chunked, rechunked);
295 }
296
297 #[test]
298 fn test_rechunk_with_too_big_chunk() {
299 let chunked = ChunkedArray::try_new(
300 vec![
301 buffer![0u64, 1, 2].into_array(),
302 buffer![42_u64; 6].into_array(),
303 buffer![4u64, 5].into_array(),
304 buffer![6u64, 7].into_array(),
305 buffer![8u64, 9].into_array(),
306 ],
307 DType::Primitive(PType::U64, Nullability::NonNullable),
308 )
309 .unwrap();
310
311 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
312 assert_eq!(rechunked.nchunks(), 4);
315 assert_arrays_eq!(chunked, rechunked);
316 }
317
318 #[test]
319 fn test_empty_chunks_all_valid() -> VortexResult<()> {
320 let chunks = vec![
322 PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
323 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
325 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), ];
327
328 let chunked =
329 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
330
331 assert!(chunked.all_valid());
333 assert!(!chunked.all_invalid());
334
335 Ok(())
336 }
337
338 #[test]
339 fn test_empty_chunks_all_invalid() -> VortexResult<()> {
340 let chunks = vec![
342 PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
343 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
345 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
347
348 let chunked =
349 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
350
351 assert!(!chunked.all_valid());
353 assert!(chunked.all_invalid());
354
355 Ok(())
356 }
357
358 #[test]
359 fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
360 let chunks = vec![
362 PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
363 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
365 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
367
368 let chunked =
369 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
370
371 assert!(!chunked.all_valid());
373 assert!(!chunked.all_invalid());
374
375 Ok(())
376 }
377}