vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9use std::fmt::Display;
10use std::fmt::Formatter;
11
12use futures::stream;
13use vortex_buffer::BufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17
18use crate::ArrayRef;
19use crate::IntoArray;
20use crate::array::Array;
21use crate::array::ArrayParts;
22use crate::array::TypedArrayRef;
23use crate::arrays::Chunked;
24use crate::arrays::PrimitiveArray;
25use crate::dtype::DType;
26use crate::iter::ArrayIterator;
27use crate::iter::ArrayIteratorAdapter;
28use crate::search_sorted::SearchSorted;
29use crate::search_sorted::SearchSortedSide;
30use crate::stream::ArrayStream;
31use crate::stream::ArrayStreamAdapter;
32use crate::validity::Validity;
33
34pub(super) const CHUNK_OFFSETS_SLOT: usize = 0;
35pub(super) const CHUNKS_OFFSET: usize = 1;
36
37#[derive(Clone, Debug)]
38pub struct ChunkedData {
39 pub(super) chunk_offsets: Vec<usize>,
40}
41
42impl Display for ChunkedData {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 write!(f, "nchunks: {}", self.chunk_offsets.len().saturating_sub(1))
45 }
46}
47
48pub trait ChunkedArrayExt: TypedArrayRef<Chunked> {
49 fn chunk_offsets_array(&self) -> &ArrayRef {
50 self.as_ref().slots()[CHUNK_OFFSETS_SLOT]
51 .as_ref()
52 .vortex_expect("validated chunk offsets slot")
53 }
54
55 fn nchunks(&self) -> usize {
56 self.as_ref().slots().len().saturating_sub(CHUNKS_OFFSET)
57 }
58
59 fn chunk(&self, idx: usize) -> &ArrayRef {
60 self.as_ref().slots()[CHUNKS_OFFSET + idx]
61 .as_ref()
62 .vortex_expect("validated chunk slot")
63 }
64
65 fn iter_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
66 Box::new(
67 self.as_ref().slots()[CHUNKS_OFFSET..]
68 .iter()
69 .map(|slot| slot.as_ref().vortex_expect("validated chunk slot")),
70 )
71 }
72
73 fn chunks(&self) -> Vec<ArrayRef> {
74 self.iter_chunks().cloned().collect()
75 }
76
77 fn non_empty_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
78 Box::new(self.iter_chunks().filter(|chunk| !chunk.is_empty()))
79 }
80
81 fn chunk_offsets(&self) -> &[usize] {
82 &self.chunk_offsets
83 }
84
85 fn find_chunk_idx(&self, index: usize) -> VortexResult<(usize, usize)> {
86 assert!(
87 index <= self.as_ref().len(),
88 "Index out of bounds of the array"
89 );
90 let chunk_offsets = self.chunk_offsets();
91 let index_chunk = chunk_offsets
92 .search_sorted(&index, SearchSortedSide::Right)?
93 .to_ends_index(self.nchunks() + 1)
94 .saturating_sub(1);
95 let chunk_start = chunk_offsets[index_chunk];
96 let index_in_chunk = index - chunk_start;
97 Ok((index_chunk, index_in_chunk))
98 }
99
100 fn array_iterator(&self) -> impl ArrayIterator + '_ {
101 ArrayIteratorAdapter::new(
102 self.as_ref().dtype().clone(),
103 self.iter_chunks().map(|chunk| Ok(chunk.clone())),
104 )
105 }
106
107 fn array_stream(&self) -> impl ArrayStream + '_ {
108 ArrayStreamAdapter::new(
109 self.as_ref().dtype().clone(),
110 stream::iter(self.iter_chunks().map(|chunk| Ok(chunk.clone()))),
111 )
112 }
113}
114impl<T: TypedArrayRef<Chunked>> ChunkedArrayExt for T {}
115
116impl ChunkedData {
117 pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec<usize> {
118 let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1);
119 chunk_offsets.push(0);
120 let mut curr_offset = 0;
121 for chunk in chunks {
122 curr_offset += chunk.len();
123 chunk_offsets.push(curr_offset);
124 }
125 chunk_offsets
126 }
127
128 pub(super) fn make_slots(
129 chunk_offsets: &[usize],
130 chunks: &[ArrayRef],
131 ) -> Vec<Option<ArrayRef>> {
132 let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(chunk_offsets.len());
133 for &offset in chunk_offsets {
134 let offset = u64::try_from(offset)
135 .vortex_expect("chunk offset must fit in u64 for serialization");
136 unsafe { chunk_offsets_buf.push_unchecked(offset) }
137 }
138
139 let mut slots = Vec::with_capacity(1 + chunks.len());
140 slots.push(Some(
141 PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable).into_array(),
142 ));
143 slots.extend(chunks.iter().map(|c| Some(c.clone())));
144 slots
145 }
146
147 pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
151 for chunk in chunks {
152 if chunk.dtype() != dtype {
153 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
154 }
155 }
156
157 Ok(())
158 }
159}
160
161impl Array<Chunked> {
162 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
164 ChunkedData::validate(&chunks, &dtype)?;
165 let len = chunks.iter().map(|chunk| chunk.len()).sum();
166 let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
167 Ok(unsafe {
168 Array::from_parts_unchecked(
169 ArrayParts::new(
170 Chunked,
171 dtype,
172 len,
173 ChunkedData {
174 chunk_offsets: chunk_offsets.clone(),
175 },
176 )
177 .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
178 )
179 })
180 }
181
182 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
183 let mut new_chunks = Vec::new();
184 let mut chunks_to_combine = Vec::new();
185 let mut new_chunk_n_bytes = 0;
186 let mut new_chunk_n_elements = 0;
187 for chunk in self.iter_chunks() {
188 let n_bytes = chunk.nbytes();
189 let n_elements = chunk.len();
190
191 if (new_chunk_n_bytes + n_bytes > target_bytesize
192 || new_chunk_n_elements + n_elements > target_rowsize)
193 && !chunks_to_combine.is_empty()
194 {
195 new_chunks.push(
196 unsafe {
197 Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone())
198 }
199 .into_array()
200 .to_canonical()?
201 .into_array(),
202 );
203
204 new_chunk_n_bytes = 0;
205 new_chunk_n_elements = 0;
206 chunks_to_combine = Vec::new();
207 }
208
209 if n_bytes > target_bytesize || n_elements > target_rowsize {
210 new_chunks.push(chunk.clone());
211 } else {
212 new_chunk_n_bytes += n_bytes;
213 new_chunk_n_elements += n_elements;
214 chunks_to_combine.push(chunk.clone());
215 }
216 }
217
218 if !chunks_to_combine.is_empty() {
219 new_chunks.push(
220 unsafe { Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone()) }
221 .into_array()
222 .to_canonical()?
223 .into_array(),
224 );
225 }
226
227 unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
228 }
229
230 pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
236 let len = chunks.iter().map(|chunk| chunk.len()).sum();
237 let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
238 unsafe {
239 Array::from_parts_unchecked(
240 ArrayParts::new(
241 Chunked,
242 dtype,
243 len,
244 ChunkedData {
245 chunk_offsets: chunk_offsets.clone(),
246 },
247 )
248 .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
249 )
250 }
251 }
252}
253
254impl FromIterator<ArrayRef> for Array<Chunked> {
255 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
256 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
257 let dtype = chunks
258 .first()
259 .map(|c| c.dtype().clone())
260 .vortex_expect("Cannot infer DType from an empty iterator");
261 Array::<Chunked>::try_new(chunks, dtype)
262 .vortex_expect("Failed to create chunked array from iterator")
263 }
264}
265
266#[cfg(test)]
267mod test {
268 use vortex_buffer::buffer;
269 use vortex_error::VortexResult;
270
271 use crate::IntoArray;
272 use crate::LEGACY_SESSION;
273 use crate::VortexSessionExecute;
274 use crate::arrays::ChunkedArray;
275 use crate::arrays::PrimitiveArray;
276 use crate::arrays::chunked::ChunkedArrayExt;
277 use crate::assert_arrays_eq;
278 use crate::dtype::DType;
279 use crate::dtype::Nullability;
280 use crate::dtype::PType;
281 use crate::validity::Validity;
282
283 #[test]
284 fn test_rechunk_one_chunk() {
285 let chunked = ChunkedArray::try_new(
286 vec![buffer![0u64].into_array()],
287 DType::Primitive(PType::U64, Nullability::NonNullable),
288 )
289 .unwrap();
290
291 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
292
293 assert_arrays_eq!(chunked, rechunked);
294 }
295
296 #[test]
297 fn test_rechunk_two_chunks() {
298 let chunked = ChunkedArray::try_new(
299 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
300 DType::Primitive(PType::U64, Nullability::NonNullable),
301 )
302 .unwrap();
303
304 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
305
306 assert_eq!(rechunked.nchunks(), 1);
307 assert_arrays_eq!(chunked, rechunked);
308 }
309
310 #[test]
311 fn test_rechunk_tiny_target_chunks() {
312 let chunked = ChunkedArray::try_new(
313 vec![
314 buffer![0u64, 1, 2, 3].into_array(),
315 buffer![4u64, 5].into_array(),
316 ],
317 DType::Primitive(PType::U64, Nullability::NonNullable),
318 )
319 .unwrap();
320
321 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
322
323 assert_eq!(rechunked.nchunks(), 2);
324 assert!(rechunked.iter_chunks().all(|c| c.len() < 5));
325 assert_arrays_eq!(chunked, rechunked);
326 }
327
328 #[test]
329 fn test_rechunk_with_too_big_chunk() {
330 let chunked = ChunkedArray::try_new(
331 vec![
332 buffer![0u64, 1, 2].into_array(),
333 buffer![42_u64; 6].into_array(),
334 buffer![4u64, 5].into_array(),
335 buffer![6u64, 7].into_array(),
336 buffer![8u64, 9].into_array(),
337 ],
338 DType::Primitive(PType::U64, Nullability::NonNullable),
339 )
340 .unwrap();
341
342 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
343 assert_eq!(rechunked.nchunks(), 4);
346 assert_arrays_eq!(chunked, rechunked);
347 }
348
349 #[test]
350 fn test_empty_chunks_all_valid() -> VortexResult<()> {
351 let chunks = vec![
353 PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
354 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
356 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), ];
358
359 let chunked =
360 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
361
362 assert!(
364 chunked
365 .all_valid(&mut LEGACY_SESSION.create_execution_ctx())
366 .unwrap()
367 );
368 assert!(
369 !chunked
370 .into_array()
371 .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())
372 .unwrap()
373 );
374
375 Ok(())
376 }
377
378 #[test]
379 fn test_empty_chunks_all_invalid() -> VortexResult<()> {
380 let chunks = vec![
382 PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
383 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
385 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
387
388 let chunked =
389 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
390
391 assert!(
393 !chunked
394 .all_valid(&mut LEGACY_SESSION.create_execution_ctx())
395 .unwrap()
396 );
397 assert!(
398 chunked
399 .into_array()
400 .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())
401 .unwrap()
402 );
403
404 Ok(())
405 }
406
407 #[test]
408 fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
409 let chunks = vec![
411 PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
412 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
414 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
416
417 let chunked =
418 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
419
420 assert!(
422 !chunked
423 .all_valid(&mut LEGACY_SESSION.create_execution_ctx())
424 .unwrap()
425 );
426 assert!(
427 !chunked
428 .into_array()
429 .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())
430 .unwrap()
431 );
432
433 Ok(())
434 }
435}