vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9use std::fmt::Display;
10use std::fmt::Formatter;
11
12use futures::stream;
13use vortex_buffer::BufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17
18use crate::ArrayRef;
19use crate::IntoArray;
20use crate::array::Array;
21use crate::array::ArrayParts;
22use crate::array::TypedArrayRef;
23use crate::arrays::Chunked;
24use crate::arrays::PrimitiveArray;
25use crate::dtype::DType;
26use crate::iter::ArrayIterator;
27use crate::iter::ArrayIteratorAdapter;
28use crate::search_sorted::SearchSorted;
29use crate::search_sorted::SearchSortedSide;
30use crate::stream::ArrayStream;
31use crate::stream::ArrayStreamAdapter;
32use crate::validity::Validity;
33
34pub(super) const CHUNK_OFFSETS_SLOT: usize = 0;
35pub(super) const CHUNKS_OFFSET: usize = 1;
36
37#[derive(Clone, Debug)]
38pub struct ChunkedData {
39 pub(super) chunk_offsets: Vec<usize>,
40 pub(super) next_builder_slot: usize,
42}
43
44impl Display for ChunkedData {
45 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46 write!(f, "nchunks: {}", self.chunk_offsets.len().saturating_sub(1))
47 }
48}
49
50pub trait ChunkedArrayExt: TypedArrayRef<Chunked> {
51 fn chunk_offsets_array(&self) -> &ArrayRef {
52 self.as_ref().slots()[CHUNK_OFFSETS_SLOT]
53 .as_ref()
54 .vortex_expect("validated chunk offsets slot")
55 }
56
57 fn nchunks(&self) -> usize {
58 self.as_ref().slots().len().saturating_sub(CHUNKS_OFFSET)
59 }
60
61 fn chunk(&self, idx: usize) -> &ArrayRef {
62 self.as_ref().slots()[CHUNKS_OFFSET + idx]
63 .as_ref()
64 .vortex_expect("validated chunk slot")
65 }
66
67 fn iter_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
68 Box::new(
69 self.as_ref().slots()[CHUNKS_OFFSET..]
70 .iter()
71 .map(|slot| slot.as_ref().vortex_expect("validated chunk slot")),
72 )
73 }
74
75 fn chunks(&self) -> Vec<ArrayRef> {
76 self.iter_chunks().cloned().collect()
77 }
78
79 fn non_empty_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
80 Box::new(self.iter_chunks().filter(|chunk| !chunk.is_empty()))
81 }
82
83 fn chunk_offsets(&self) -> &[usize] {
84 &self.chunk_offsets
85 }
86
87 fn find_chunk_idx(&self, index: usize) -> VortexResult<(usize, usize)> {
88 assert!(
89 index <= self.as_ref().len(),
90 "Index out of bounds of the array"
91 );
92 let chunk_offsets = self.chunk_offsets();
93 let index_chunk = chunk_offsets
94 .search_sorted(&index, SearchSortedSide::Right)?
95 .to_ends_index(self.nchunks() + 1)
96 .saturating_sub(1);
97 let chunk_start = chunk_offsets[index_chunk];
98 let index_in_chunk = index - chunk_start;
99 Ok((index_chunk, index_in_chunk))
100 }
101
102 fn array_iterator(&self) -> impl ArrayIterator + '_ {
103 ArrayIteratorAdapter::new(
104 self.as_ref().dtype().clone(),
105 self.iter_chunks().map(|chunk| Ok(chunk.clone())),
106 )
107 }
108
109 fn array_stream(&self) -> impl ArrayStream + '_ {
110 ArrayStreamAdapter::new(
111 self.as_ref().dtype().clone(),
112 stream::iter(self.iter_chunks().map(|chunk| Ok(chunk.clone()))),
113 )
114 }
115}
116impl<T: TypedArrayRef<Chunked>> ChunkedArrayExt for T {}
117
118impl ChunkedData {
119 pub(super) fn new(chunk_offsets: Vec<usize>) -> Self {
120 Self {
121 chunk_offsets,
122 next_builder_slot: CHUNKS_OFFSET,
123 }
124 }
125
126 pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec<usize> {
127 let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1);
128 chunk_offsets.push(0);
129 let mut curr_offset = 0;
130 for chunk in chunks {
131 curr_offset += chunk.len();
132 chunk_offsets.push(curr_offset);
133 }
134 chunk_offsets
135 }
136
137 pub(super) fn make_slots(
138 chunk_offsets: &[usize],
139 chunks: &[ArrayRef],
140 ) -> Vec<Option<ArrayRef>> {
141 let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(chunk_offsets.len());
142 for &offset in chunk_offsets {
143 let offset = u64::try_from(offset)
144 .vortex_expect("chunk offset must fit in u64 for serialization");
145 unsafe { chunk_offsets_buf.push_unchecked(offset) }
146 }
147
148 let mut slots = Vec::with_capacity(1 + chunks.len());
149 slots.push(Some(
150 PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable).into_array(),
151 ));
152 slots.extend(chunks.iter().map(|c| Some(c.clone())));
153 slots
154 }
155
156 pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
160 for chunk in chunks {
161 if chunk.dtype() != dtype {
162 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
163 }
164 }
165
166 Ok(())
167 }
168}
169
170impl Array<Chunked> {
171 pub(super) fn with_next_builder_slot(mut self, next_builder_slot: usize) -> Self {
172 if let Some(data) = self.data_mut() {
173 data.next_builder_slot = next_builder_slot;
174 return self;
175 }
176 let stats = self.statistics().to_owned();
179 let mut data = self.data().clone();
180 data.next_builder_slot = next_builder_slot;
181 unsafe {
183 Array::from_parts_unchecked(
184 ArrayParts::new(Chunked, self.dtype().clone(), self.len(), data)
185 .with_slots(self.slots().to_vec()),
186 )
187 }
188 .with_stats_set(stats)
189 }
190
191 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
193 ChunkedData::validate(&chunks, &dtype)?;
194 Ok(unsafe { Self::new_unchecked(chunks, dtype) })
196 }
197
198 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
199 let mut new_chunks = Vec::new();
200 let mut chunks_to_combine = Vec::new();
201 let mut new_chunk_n_bytes = 0;
202 let mut new_chunk_n_elements = 0;
203 for chunk in self.iter_chunks() {
204 let n_bytes = chunk.nbytes();
205 let n_elements = chunk.len();
206
207 if (new_chunk_n_bytes + n_bytes > target_bytesize
208 || new_chunk_n_elements + n_elements > target_rowsize)
209 && !chunks_to_combine.is_empty()
210 {
211 #[expect(deprecated)]
212 let canonical = unsafe {
213 Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone())
214 }
215 .into_array()
216 .to_canonical()?
217 .into_array();
218 new_chunks.push(canonical);
219
220 new_chunk_n_bytes = 0;
221 new_chunk_n_elements = 0;
222 chunks_to_combine = Vec::new();
223 }
224
225 if n_bytes > target_bytesize || n_elements > target_rowsize {
226 new_chunks.push(chunk.clone());
227 } else {
228 new_chunk_n_bytes += n_bytes;
229 new_chunk_n_elements += n_elements;
230 chunks_to_combine.push(chunk.clone());
231 }
232 }
233
234 if !chunks_to_combine.is_empty() {
235 #[expect(deprecated)]
236 let canonical =
237 unsafe { Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone()) }
238 .into_array()
239 .to_canonical()?
240 .into_array();
241 new_chunks.push(canonical);
242 }
243
244 unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
245 }
246
247 pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
253 let len = chunks.iter().map(|chunk| chunk.len()).sum();
254 let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
255 unsafe {
256 Array::from_parts_unchecked(
257 ArrayParts::new(Chunked, dtype, len, ChunkedData::new(chunk_offsets.clone()))
258 .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
259 )
260 }
261 }
262}
263
264impl FromIterator<ArrayRef> for Array<Chunked> {
265 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
266 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
267 let dtype = chunks
268 .first()
269 .map(|c| c.dtype().clone())
270 .vortex_expect("Cannot infer DType from an empty iterator");
271 Array::<Chunked>::try_new(chunks, dtype)
272 .vortex_expect("Failed to create chunked array from iterator")
273 }
274}
275
276#[cfg(test)]
277mod test {
278 use vortex_buffer::buffer;
279 use vortex_error::VortexResult;
280
281 use crate::IntoArray;
282 use crate::LEGACY_SESSION;
283 use crate::VortexSessionExecute;
284 use crate::arrays::ChunkedArray;
285 use crate::arrays::PrimitiveArray;
286 use crate::arrays::chunked::ChunkedArrayExt;
287 use crate::assert_arrays_eq;
288 use crate::dtype::DType;
289 use crate::dtype::Nullability;
290 use crate::dtype::PType;
291 use crate::validity::Validity;
292
293 #[test]
294 fn test_rechunk_one_chunk() {
295 let chunked = ChunkedArray::try_new(
296 vec![buffer![0u64].into_array()],
297 DType::Primitive(PType::U64, Nullability::NonNullable),
298 )
299 .unwrap();
300
301 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
302
303 assert_arrays_eq!(chunked, rechunked);
304 }
305
306 #[test]
307 fn test_rechunk_two_chunks() {
308 let chunked = ChunkedArray::try_new(
309 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
310 DType::Primitive(PType::U64, Nullability::NonNullable),
311 )
312 .unwrap();
313
314 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
315
316 assert_eq!(rechunked.nchunks(), 1);
317 assert_arrays_eq!(chunked, rechunked);
318 }
319
320 #[test]
321 fn test_rechunk_tiny_target_chunks() {
322 let chunked = ChunkedArray::try_new(
323 vec![
324 buffer![0u64, 1, 2, 3].into_array(),
325 buffer![4u64, 5].into_array(),
326 ],
327 DType::Primitive(PType::U64, Nullability::NonNullable),
328 )
329 .unwrap();
330
331 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
332
333 assert_eq!(rechunked.nchunks(), 2);
334 assert!(rechunked.iter_chunks().all(|c| c.len() < 5));
335 assert_arrays_eq!(chunked, rechunked);
336 }
337
338 #[test]
339 fn test_rechunk_with_too_big_chunk() {
340 let chunked = ChunkedArray::try_new(
341 vec![
342 buffer![0u64, 1, 2].into_array(),
343 buffer![42_u64; 6].into_array(),
344 buffer![4u64, 5].into_array(),
345 buffer![6u64, 7].into_array(),
346 buffer![8u64, 9].into_array(),
347 ],
348 DType::Primitive(PType::U64, Nullability::NonNullable),
349 )
350 .unwrap();
351
352 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
353 assert_eq!(rechunked.nchunks(), 4);
356 assert_arrays_eq!(chunked, rechunked);
357 }
358
359 #[test]
360 fn test_empty_chunks_all_valid() -> VortexResult<()> {
361 let chunks = vec![
363 PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
364 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
366 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), ];
368
369 let chunked =
370 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
371
372 assert!(chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
374 assert!(
375 !chunked
376 .into_array()
377 .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
378 );
379
380 Ok(())
381 }
382
383 #[test]
384 fn test_empty_chunks_all_invalid() -> VortexResult<()> {
385 let chunks = vec![
387 PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
388 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
390 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
392
393 let chunked =
394 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
395
396 assert!(!chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
398 assert!(
399 chunked
400 .into_array()
401 .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
402 );
403
404 Ok(())
405 }
406
407 #[test]
408 fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
409 let chunks = vec![
411 PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
412 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
414 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
416
417 let chunked =
418 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
419
420 assert!(!chunked.all_valid(&mut LEGACY_SESSION.create_execution_ctx())?);
422 assert!(
423 !chunked
424 .into_array()
425 .all_invalid(&mut LEGACY_SESSION.create_execution_ctx())?
426 );
427
428 Ok(())
429 }
430}