vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9
10use futures::stream;
11use vortex_buffer::Buffer;
12use vortex_buffer::BufferMut;
13use vortex_dtype::DType;
14use vortex_error::VortexExpect as _;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17
18use crate::Array;
19use crate::ArrayRef;
20use crate::IntoArray;
21use crate::arrays::PrimitiveArray;
22use crate::iter::ArrayIterator;
23use crate::iter::ArrayIteratorAdapter;
24use crate::search_sorted::SearchSorted;
25use crate::search_sorted::SearchSortedSide;
26use crate::stats::ArrayStats;
27use crate::stream::ArrayStream;
28use crate::stream::ArrayStreamAdapter;
29use crate::validity::Validity;
30
31#[derive(Clone, Debug)]
32pub struct ChunkedArray {
33 pub(super) dtype: DType,
34 pub(super) len: usize,
35 pub(super) chunk_offsets: PrimitiveArray,
36 pub(super) chunks: Vec<ArrayRef>,
37 pub(super) stats_set: ArrayStats,
38}
39
40impl ChunkedArray {
41 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
50 Self::validate(&chunks, &dtype)?;
51
52 unsafe { Ok(Self::new_unchecked(chunks, dtype)) }
54 }
55
56 pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
65 #[cfg(debug_assertions)]
66 Self::validate(&chunks, &dtype)
67 .vortex_expect("[Debug Assertion]: Invalid `ChunkedArray` parameters");
68
69 let nchunks = chunks.len();
70
71 let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(nchunks + 1);
72 unsafe { chunk_offsets_buf.push_unchecked(0) }
74 let mut curr_offset = 0;
75 for c in &chunks {
76 curr_offset += c.len() as u64;
77 unsafe { chunk_offsets_buf.push_unchecked(curr_offset) }
79 }
80
81 let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable);
82
83 Self {
84 dtype,
85 len: curr_offset
86 .try_into()
87 .vortex_expect("chunk offset must fit in usize"),
88 chunk_offsets,
89 chunks,
90 stats_set: Default::default(),
91 }
92 }
93
94 pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
98 for chunk in chunks {
99 if chunk.dtype() != dtype {
100 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
101 }
102 }
103
104 Ok(())
105 }
106
107 #[inline]
108 pub fn chunk(&self, idx: usize) -> &ArrayRef {
109 assert!(idx < self.nchunks(), "chunk index {idx} out of bounds");
110
111 &self.chunks[idx]
112 }
113
114 pub fn nchunks(&self) -> usize {
115 self.chunks.len()
116 }
117
118 #[inline]
119 pub fn chunk_offsets(&self) -> Buffer<u64> {
120 self.chunk_offsets.buffer()
121 }
122
123 pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
124 assert!(index <= self.len(), "Index out of bounds of the array");
125 let index = index as u64;
126
127 let index_chunk = self
130 .chunk_offsets()
131 .search_sorted(&index, SearchSortedSide::Right)
132 .to_ends_index(self.nchunks() + 1)
133 .saturating_sub(1);
134 let chunk_start = self.chunk_offsets()[index_chunk];
135
136 let index_in_chunk =
137 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
138 (index_chunk, index_in_chunk)
139 }
140
141 pub fn chunks(&self) -> &[ArrayRef] {
142 &self.chunks
143 }
144
145 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
146 self.chunks().iter().filter(|c| !c.is_empty())
147 }
148
149 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
150 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
151 }
152
153 pub fn array_stream(&self) -> impl ArrayStream + '_ {
154 ArrayStreamAdapter::new(
155 self.dtype().clone(),
156 stream::iter(self.chunks().iter().cloned().map(Ok)),
157 )
158 }
159
160 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
161 let mut new_chunks = Vec::new();
162 let mut chunks_to_combine = Vec::new();
163 let mut new_chunk_n_bytes = 0;
164 let mut new_chunk_n_elements = 0;
165 for chunk in self.chunks() {
166 let n_bytes = chunk.nbytes();
167 let n_elements = chunk.len();
168
169 if (new_chunk_n_bytes + n_bytes > target_bytesize
170 || new_chunk_n_elements + n_elements > target_rowsize)
171 && !chunks_to_combine.is_empty()
172 {
173 new_chunks.push(
174 unsafe {
177 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
178 .to_canonical()
179 .into_array()
180 },
181 );
182
183 new_chunk_n_bytes = 0;
184 new_chunk_n_elements = 0;
185 chunks_to_combine = Vec::new();
186 }
187
188 if n_bytes > target_bytesize || n_elements > target_rowsize {
189 new_chunks.push(chunk.clone());
190 } else {
191 new_chunk_n_bytes += n_bytes;
192 new_chunk_n_elements += n_elements;
193 chunks_to_combine.push(chunk.clone());
194 }
195 }
196
197 if !chunks_to_combine.is_empty() {
198 new_chunks.push(unsafe {
199 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
202 .to_canonical()
203 .into_array()
204 });
205 }
206
207 unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
210 }
211}
212
213impl FromIterator<ArrayRef> for ChunkedArray {
214 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
215 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
216 let dtype = chunks
217 .first()
218 .map(|c| c.dtype().clone())
219 .vortex_expect("Cannot infer DType from an empty iterator");
220 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
221 }
222}
223
224#[cfg(test)]
225mod test {
226 use vortex_buffer::buffer;
227 use vortex_dtype::DType;
228 use vortex_dtype::Nullability;
229 use vortex_dtype::PType;
230 use vortex_error::VortexResult;
231
232 use crate::IntoArray;
233 use crate::ToCanonical;
234 use crate::array::Array;
235 use crate::arrays::ChunkedVTable;
236 use crate::arrays::PrimitiveArray;
237 use crate::arrays::chunked::ChunkedArray;
238 use crate::assert_arrays_eq;
239 use crate::compute::sub_scalar;
240 use crate::validity::Validity;
241
242 fn chunked_array() -> ChunkedArray {
243 ChunkedArray::try_new(
244 vec![
245 buffer![1u64, 2, 3].into_array(),
246 buffer![4u64, 5, 6].into_array(),
247 buffer![7u64, 8, 9].into_array(),
248 ],
249 DType::Primitive(PType::U64, Nullability::NonNullable),
250 )
251 .unwrap()
252 }
253
254 #[test]
255 fn test_scalar_subtract() {
256 let chunked = chunked_array().into_array();
257 let to_subtract = 1u64;
258 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
259
260 let chunked = array.as_::<ChunkedVTable>();
261 let chunks_out = chunked.chunks();
262
263 let results = chunks_out[0].to_primitive().as_slice::<u64>().to_vec();
264 assert_eq!(results, &[0u64, 1, 2]);
265 let results = chunks_out[1].to_primitive().as_slice::<u64>().to_vec();
266 assert_eq!(results, &[3u64, 4, 5]);
267 let results = chunks_out[2].to_primitive().as_slice::<u64>().to_vec();
268 assert_eq!(results, &[6u64, 7, 8]);
269 }
270
271 #[test]
272 fn test_rechunk_one_chunk() {
273 let chunked = ChunkedArray::try_new(
274 vec![buffer![0u64].into_array()],
275 DType::Primitive(PType::U64, Nullability::NonNullable),
276 )
277 .unwrap();
278
279 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
280
281 assert_arrays_eq!(chunked, rechunked);
282 }
283
284 #[test]
285 fn test_rechunk_two_chunks() {
286 let chunked = ChunkedArray::try_new(
287 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
288 DType::Primitive(PType::U64, Nullability::NonNullable),
289 )
290 .unwrap();
291
292 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
293
294 assert_eq!(rechunked.nchunks(), 1);
295 assert_arrays_eq!(chunked, rechunked);
296 }
297
298 #[test]
299 fn test_rechunk_tiny_target_chunks() {
300 let chunked = ChunkedArray::try_new(
301 vec![
302 buffer![0u64, 1, 2, 3].into_array(),
303 buffer![4u64, 5].into_array(),
304 ],
305 DType::Primitive(PType::U64, Nullability::NonNullable),
306 )
307 .unwrap();
308
309 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
310
311 assert_eq!(rechunked.nchunks(), 2);
312 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
313 assert_arrays_eq!(chunked, rechunked);
314 }
315
316 #[test]
317 fn test_rechunk_with_too_big_chunk() {
318 let chunked = ChunkedArray::try_new(
319 vec![
320 buffer![0u64, 1, 2].into_array(),
321 buffer![42_u64; 6].into_array(),
322 buffer![4u64, 5].into_array(),
323 buffer![6u64, 7].into_array(),
324 buffer![8u64, 9].into_array(),
325 ],
326 DType::Primitive(PType::U64, Nullability::NonNullable),
327 )
328 .unwrap();
329
330 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
331 assert_eq!(rechunked.nchunks(), 4);
334 assert_arrays_eq!(chunked, rechunked);
335 }
336
337 #[test]
338 fn test_empty_chunks_all_valid() -> VortexResult<()> {
339 let chunks = vec![
341 PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
342 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
344 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), ];
346
347 let chunked =
348 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
349
350 assert!(chunked.all_valid());
352 assert!(!chunked.all_invalid());
353
354 Ok(())
355 }
356
357 #[test]
358 fn test_empty_chunks_all_invalid() -> VortexResult<()> {
359 let chunks = vec![
361 PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
362 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
364 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
366
367 let chunked =
368 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
369
370 assert!(!chunked.all_valid());
372 assert!(chunked.all_invalid());
373
374 Ok(())
375 }
376
377 #[test]
378 fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
379 let chunks = vec![
381 PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
382 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
384 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
386
387 let chunked =
388 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
389
390 assert!(!chunked.all_valid());
392 assert!(!chunked.all_invalid());
393
394 Ok(())
395 }
396}