vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9use std::fmt::Display;
10use std::fmt::Formatter;
11
12use futures::stream;
13use smallvec::SmallVec;
14use vortex_buffer::BufferMut;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_error::vortex_bail;
18
19use crate::ArrayRef;
20use crate::ArraySlots;
21use crate::IntoArray;
22use crate::array::Array;
23use crate::array::ArrayParts;
24use crate::array::TypedArrayRef;
25use crate::arrays::Chunked;
26use crate::arrays::PrimitiveArray;
27use crate::dtype::DType;
28use crate::iter::ArrayIterator;
29use crate::iter::ArrayIteratorAdapter;
30use crate::search_sorted::SearchSorted;
31use crate::search_sorted::SearchSortedSide;
32use crate::stream::ArrayStream;
33use crate::stream::ArrayStreamAdapter;
34use crate::validity::Validity;
35
36pub(super) const CHUNK_OFFSETS_SLOT: usize = 0;
37pub(super) const CHUNKS_OFFSET: usize = 1;
38
39#[derive(Clone, Debug)]
40pub struct ChunkedData {
41 pub(super) chunk_offsets: Vec<usize>,
42 pub(super) next_builder_slot: usize,
44}
45
46impl Display for ChunkedData {
47 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48 write!(f, "nchunks: {}", self.chunk_offsets.len().saturating_sub(1))
49 }
50}
51
52pub trait ChunkedArrayExt: TypedArrayRef<Chunked> {
53 fn chunk_offsets_array(&self) -> &ArrayRef {
54 self.as_ref().slots()[CHUNK_OFFSETS_SLOT]
55 .as_ref()
56 .vortex_expect("validated chunk offsets slot")
57 }
58
59 fn nchunks(&self) -> usize {
60 self.as_ref().slots().len().saturating_sub(CHUNKS_OFFSET)
61 }
62
63 fn chunk(&self, idx: usize) -> &ArrayRef {
64 self.as_ref().slots()[CHUNKS_OFFSET + idx]
65 .as_ref()
66 .vortex_expect("validated chunk slot")
67 }
68
69 fn iter_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
70 Box::new(
71 self.as_ref().slots()[CHUNKS_OFFSET..]
72 .iter()
73 .map(|slot| slot.as_ref().vortex_expect("validated chunk slot")),
74 )
75 }
76
77 fn chunks(&self) -> Vec<ArrayRef> {
78 self.iter_chunks().cloned().collect()
79 }
80
81 fn non_empty_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
82 Box::new(self.iter_chunks().filter(|chunk| !chunk.is_empty()))
83 }
84
85 fn chunk_offsets(&self) -> &[usize] {
86 &self.chunk_offsets
87 }
88
89 fn find_chunk_idx(&self, index: usize) -> VortexResult<(usize, usize)> {
90 assert!(
91 index <= self.as_ref().len(),
92 "Index out of bounds of the array"
93 );
94 let chunk_offsets = self.chunk_offsets();
95 let index_chunk = chunk_offsets
96 .search_sorted(&index, SearchSortedSide::Right)?
97 .to_ends_index(self.nchunks() + 1)
98 .saturating_sub(1);
99 let chunk_start = chunk_offsets[index_chunk];
100 let index_in_chunk = index - chunk_start;
101 Ok((index_chunk, index_in_chunk))
102 }
103
104 fn array_iterator(&self) -> impl ArrayIterator + '_ {
105 ArrayIteratorAdapter::new(
106 self.as_ref().dtype().clone(),
107 self.iter_chunks().map(|chunk| Ok(chunk.clone())),
108 )
109 }
110
111 fn array_stream(&self) -> impl ArrayStream + '_ {
112 ArrayStreamAdapter::new(
113 self.as_ref().dtype().clone(),
114 stream::iter(self.iter_chunks().map(|chunk| Ok(chunk.clone()))),
115 )
116 }
117}
118impl<T: TypedArrayRef<Chunked>> ChunkedArrayExt for T {}
119
120impl ChunkedData {
121 pub(super) fn new(chunk_offsets: Vec<usize>) -> Self {
122 Self {
123 chunk_offsets,
124 next_builder_slot: CHUNKS_OFFSET,
125 }
126 }
127
128 pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec<usize> {
129 let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1);
130 chunk_offsets.push(0);
131 let mut curr_offset = 0;
132 for chunk in chunks {
133 curr_offset += chunk.len();
134 chunk_offsets.push(curr_offset);
135 }
136 chunk_offsets
137 }
138
139 pub(super) fn make_slots(chunk_offsets: &[usize], chunks: &[ArrayRef]) -> ArraySlots {
140 let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(chunk_offsets.len());
141 for &offset in chunk_offsets {
142 let offset = u64::try_from(offset)
143 .vortex_expect("chunk offset must fit in u64 for serialization");
144 unsafe { chunk_offsets_buf.push_unchecked(offset) }
145 }
146
147 let mut slots = SmallVec::with_capacity(1 + chunks.len());
148 slots.push(Some(
149 PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable).into_array(),
150 ));
151 slots.extend(chunks.iter().map(|c| Some(c.clone())));
152 slots
153 }
154
155 pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
159 for chunk in chunks {
160 if chunk.dtype() != dtype {
161 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
162 }
163 }
164
165 Ok(())
166 }
167}
168
169impl Array<Chunked> {
170 pub(super) fn with_next_builder_slot(mut self, next_builder_slot: usize) -> Self {
171 if let Some(data) = self.data_mut() {
172 data.next_builder_slot = next_builder_slot;
173 return self;
174 }
175 let stats = self.statistics().to_owned();
178 let mut data = self.data().clone();
179 data.next_builder_slot = next_builder_slot;
180 unsafe {
182 Array::from_parts_unchecked(
183 ArrayParts::new(Chunked, self.dtype().clone(), self.len(), data)
184 .with_slots(self.slots().iter().cloned().collect::<ArraySlots>()),
185 )
186 }
187 .with_stats_set(stats)
188 }
189
190 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
192 ChunkedData::validate(&chunks, &dtype)?;
193 Ok(unsafe { Self::new_unchecked(chunks, dtype) })
195 }
196
197 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
198 let mut new_chunks = Vec::new();
199 let mut chunks_to_combine = Vec::new();
200 let mut new_chunk_n_bytes = 0;
201 let mut new_chunk_n_elements = 0;
202 for chunk in self.iter_chunks() {
203 let n_bytes = chunk.nbytes();
204 let n_elements = chunk.len();
205
206 if (new_chunk_n_bytes + n_bytes > target_bytesize
207 || new_chunk_n_elements + n_elements > target_rowsize)
208 && !chunks_to_combine.is_empty()
209 {
210 #[expect(deprecated)]
211 let canonical = unsafe {
212 Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone())
213 }
214 .into_array()
215 .to_canonical()?
216 .into_array();
217 new_chunks.push(canonical);
218
219 new_chunk_n_bytes = 0;
220 new_chunk_n_elements = 0;
221 chunks_to_combine = Vec::new();
222 }
223
224 if n_bytes > target_bytesize || n_elements > target_rowsize {
225 new_chunks.push(chunk.clone());
226 } else {
227 new_chunk_n_bytes += n_bytes;
228 new_chunk_n_elements += n_elements;
229 chunks_to_combine.push(chunk.clone());
230 }
231 }
232
233 if !chunks_to_combine.is_empty() {
234 #[expect(deprecated)]
235 let canonical =
236 unsafe { Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone()) }
237 .into_array()
238 .to_canonical()?
239 .into_array();
240 new_chunks.push(canonical);
241 }
242
243 unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
244 }
245
246 pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
252 let len = chunks.iter().map(|chunk| chunk.len()).sum();
253 let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
254 unsafe {
255 Array::from_parts_unchecked(
256 ArrayParts::new(Chunked, dtype, len, ChunkedData::new(chunk_offsets.clone()))
257 .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
258 )
259 }
260 }
261}
262
263impl FromIterator<ArrayRef> for Array<Chunked> {
264 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
265 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
266 let dtype = chunks
267 .first()
268 .map(|c| c.dtype().clone())
269 .vortex_expect("Cannot infer DType from an empty iterator");
270 Array::<Chunked>::try_new(chunks, dtype)
271 .vortex_expect("Failed to create chunked array from iterator")
272 }
273}
274
275#[cfg(test)]
276mod test {
277 use vortex_buffer::buffer;
278 use vortex_error::VortexResult;
279
280 use crate::IntoArray;
281 use crate::LEGACY_SESSION;
282 use crate::VortexSessionExecute;
283 use crate::arrays::ChunkedArray;
284 use crate::arrays::PrimitiveArray;
285 use crate::arrays::chunked::ChunkedArrayExt;
286 use crate::assert_arrays_eq;
287 use crate::dtype::DType;
288 use crate::dtype::Nullability;
289 use crate::dtype::PType;
290 use crate::validity::Validity;
291
292 #[test]
293 fn test_rechunk_one_chunk() {
294 let chunked = ChunkedArray::try_new(
295 vec![buffer![0u64].into_array()],
296 DType::Primitive(PType::U64, Nullability::NonNullable),
297 )
298 .unwrap();
299
300 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
301
302 assert_arrays_eq!(chunked, rechunked);
303 }
304
305 #[test]
306 fn test_rechunk_two_chunks() {
307 let chunked = ChunkedArray::try_new(
308 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
309 DType::Primitive(PType::U64, Nullability::NonNullable),
310 )
311 .unwrap();
312
313 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
314
315 assert_eq!(rechunked.nchunks(), 1);
316 assert_arrays_eq!(chunked, rechunked);
317 }
318
319 #[test]
320 fn test_rechunk_tiny_target_chunks() {
321 let chunked = ChunkedArray::try_new(
322 vec![
323 buffer![0u64, 1, 2, 3].into_array(),
324 buffer![4u64, 5].into_array(),
325 ],
326 DType::Primitive(PType::U64, Nullability::NonNullable),
327 )
328 .unwrap();
329
330 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
331
332 assert_eq!(rechunked.nchunks(), 2);
333 assert!(rechunked.iter_chunks().all(|c| c.len() < 5));
334 assert_arrays_eq!(chunked, rechunked);
335 }
336
337 #[test]
338 fn test_rechunk_with_too_big_chunk() {
339 let chunked = ChunkedArray::try_new(
340 vec![
341 buffer![0u64, 1, 2].into_array(),
342 buffer![42_u64; 6].into_array(),
343 buffer![4u64, 5].into_array(),
344 buffer![6u64, 7].into_array(),
345 buffer![8u64, 9].into_array(),
346 ],
347 DType::Primitive(PType::U64, Nullability::NonNullable),
348 )
349 .unwrap();
350
351 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
352 assert_eq!(rechunked.nchunks(), 4);
355 assert_arrays_eq!(chunked, rechunked);
356 }
357
358 #[test]
359 fn test_empty_chunks_all_valid() -> VortexResult<()> {
360 let chunks = vec![
362 PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
363 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
365 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), ];
367
368 let chunked =
369 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
370
371 assert!(chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
373 assert!(
374 !chunked
375 .into_array()
376 .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
377 );
378
379 Ok(())
380 }
381
382 #[test]
383 fn test_empty_chunks_all_invalid() -> VortexResult<()> {
384 let chunks = vec![
386 PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
387 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
389 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
391
392 let chunked =
393 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
394
395 assert!(!chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
397 assert!(
398 chunked
399 .into_array()
400 .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
401 );
402
403 Ok(())
404 }
405
406 #[test]
407 fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
408 let chunks = vec![
410 PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
411 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
413 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
415
416 let chunked =
417 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
418
419 assert!(!chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
421 assert!(
422 !chunked
423 .into_array()
424 .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
425 );
426
427 Ok(())
428 }
429}