vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9use std::fmt::Display;
10use std::fmt::Formatter;
11
12use futures::stream;
13use vortex_buffer::BufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17
18use crate::ArrayRef;
19use crate::IntoArray;
20use crate::array::Array;
21use crate::array::ArrayParts;
22use crate::array::TypedArrayRef;
23use crate::arrays::Chunked;
24use crate::arrays::PrimitiveArray;
25use crate::dtype::DType;
26use crate::iter::ArrayIterator;
27use crate::iter::ArrayIteratorAdapter;
28use crate::search_sorted::SearchSorted;
29use crate::search_sorted::SearchSortedSide;
30use crate::stream::ArrayStream;
31use crate::stream::ArrayStreamAdapter;
32use crate::validity::Validity;
33
34pub(super) const CHUNK_OFFSETS_SLOT: usize = 0;
35pub(super) const CHUNKS_OFFSET: usize = 1;
36
37#[derive(Clone, Debug)]
38pub struct ChunkedData {
39 pub(super) chunk_offsets: Vec<usize>,
40}
41
42impl Display for ChunkedData {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 write!(f, "nchunks: {}", self.chunk_offsets.len().saturating_sub(1))
45 }
46}
47
48pub trait ChunkedArrayExt: TypedArrayRef<Chunked> {
49 fn chunk_offsets_array(&self) -> &ArrayRef {
50 self.as_ref().slots()[CHUNK_OFFSETS_SLOT]
51 .as_ref()
52 .vortex_expect("validated chunk offsets slot")
53 }
54
55 fn nchunks(&self) -> usize {
56 self.as_ref().slots().len().saturating_sub(CHUNKS_OFFSET)
57 }
58
59 fn chunk(&self, idx: usize) -> &ArrayRef {
60 self.as_ref().slots()[CHUNKS_OFFSET + idx]
61 .as_ref()
62 .vortex_expect("validated chunk slot")
63 }
64
65 fn iter_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
66 Box::new(
67 self.as_ref().slots()[CHUNKS_OFFSET..]
68 .iter()
69 .map(|slot| slot.as_ref().vortex_expect("validated chunk slot")),
70 )
71 }
72
73 fn chunks(&self) -> Vec<ArrayRef> {
74 self.iter_chunks().cloned().collect()
75 }
76
77 fn non_empty_chunks<'a>(&'a self) -> Box<dyn Iterator<Item = &'a ArrayRef> + 'a> {
78 Box::new(self.iter_chunks().filter(|chunk| !chunk.is_empty()))
79 }
80
81 fn chunk_offsets(&self) -> &[usize] {
82 &self.chunk_offsets
83 }
84
85 fn find_chunk_idx(&self, index: usize) -> VortexResult<(usize, usize)> {
86 assert!(
87 index <= self.as_ref().len(),
88 "Index out of bounds of the array"
89 );
90 let chunk_offsets = self.chunk_offsets();
91 let index_chunk = chunk_offsets
92 .search_sorted(&index, SearchSortedSide::Right)?
93 .to_ends_index(self.nchunks() + 1)
94 .saturating_sub(1);
95 let chunk_start = chunk_offsets[index_chunk];
96 let index_in_chunk = index - chunk_start;
97 Ok((index_chunk, index_in_chunk))
98 }
99
100 fn array_iterator(&self) -> impl ArrayIterator + '_ {
101 ArrayIteratorAdapter::new(
102 self.as_ref().dtype().clone(),
103 self.iter_chunks().map(|chunk| Ok(chunk.clone())),
104 )
105 }
106
107 fn array_stream(&self) -> impl ArrayStream + '_ {
108 ArrayStreamAdapter::new(
109 self.as_ref().dtype().clone(),
110 stream::iter(self.iter_chunks().map(|chunk| Ok(chunk.clone()))),
111 )
112 }
113}
114impl<T: TypedArrayRef<Chunked>> ChunkedArrayExt for T {}
115
116impl ChunkedData {
117 pub(super) fn compute_chunk_offsets(chunks: &[ArrayRef]) -> Vec<usize> {
118 let mut chunk_offsets = Vec::with_capacity(chunks.len() + 1);
119 chunk_offsets.push(0);
120 let mut curr_offset = 0;
121 for chunk in chunks {
122 curr_offset += chunk.len();
123 chunk_offsets.push(curr_offset);
124 }
125 chunk_offsets
126 }
127
128 pub(super) fn make_slots(
129 chunk_offsets: &[usize],
130 chunks: &[ArrayRef],
131 ) -> Vec<Option<ArrayRef>> {
132 let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(chunk_offsets.len());
133 for &offset in chunk_offsets {
134 let offset = u64::try_from(offset)
135 .vortex_expect("chunk offset must fit in u64 for serialization");
136 unsafe { chunk_offsets_buf.push_unchecked(offset) }
137 }
138
139 let mut slots = Vec::with_capacity(1 + chunks.len());
140 slots.push(Some(
141 PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable).into_array(),
142 ));
143 slots.extend(chunks.iter().map(|c| Some(c.clone())));
144 slots
145 }
146
147 pub fn validate(chunks: &[ArrayRef], dtype: &DType) -> VortexResult<()> {
151 for chunk in chunks {
152 if chunk.dtype() != dtype {
153 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
154 }
155 }
156
157 Ok(())
158 }
159}
160
161impl Array<Chunked> {
162 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
164 ChunkedData::validate(&chunks, &dtype)?;
165 let len = chunks.iter().map(|chunk| chunk.len()).sum();
166 let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
167 Ok(unsafe {
168 Array::from_parts_unchecked(
169 ArrayParts::new(
170 Chunked,
171 dtype,
172 len,
173 ChunkedData {
174 chunk_offsets: chunk_offsets.clone(),
175 },
176 )
177 .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
178 )
179 })
180 }
181
182 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
183 let mut new_chunks = Vec::new();
184 let mut chunks_to_combine = Vec::new();
185 let mut new_chunk_n_bytes = 0;
186 let mut new_chunk_n_elements = 0;
187 for chunk in self.iter_chunks() {
188 let n_bytes = chunk.nbytes();
189 let n_elements = chunk.len();
190
191 if (new_chunk_n_bytes + n_bytes > target_bytesize
192 || new_chunk_n_elements + n_elements > target_rowsize)
193 && !chunks_to_combine.is_empty()
194 {
195 new_chunks.push(
196 unsafe {
197 Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone())
198 }
199 .into_array()
200 .to_canonical()?
201 .into_array(),
202 );
203
204 new_chunk_n_bytes = 0;
205 new_chunk_n_elements = 0;
206 chunks_to_combine = Vec::new();
207 }
208
209 if n_bytes > target_bytesize || n_elements > target_rowsize {
210 new_chunks.push(chunk.clone());
211 } else {
212 new_chunk_n_bytes += n_bytes;
213 new_chunk_n_elements += n_elements;
214 chunks_to_combine.push(chunk.clone());
215 }
216 }
217
218 if !chunks_to_combine.is_empty() {
219 new_chunks.push(
220 unsafe { Array::<Chunked>::new_unchecked(chunks_to_combine, self.dtype().clone()) }
221 .into_array()
222 .to_canonical()?
223 .into_array(),
224 );
225 }
226
227 unsafe { Ok(Self::new_unchecked(new_chunks, self.dtype().clone())) }
228 }
229
230 pub unsafe fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
236 let len = chunks.iter().map(|chunk| chunk.len()).sum();
237 let chunk_offsets = ChunkedData::compute_chunk_offsets(&chunks);
238 unsafe {
239 Array::from_parts_unchecked(
240 ArrayParts::new(
241 Chunked,
242 dtype,
243 len,
244 ChunkedData {
245 chunk_offsets: chunk_offsets.clone(),
246 },
247 )
248 .with_slots(ChunkedData::make_slots(&chunk_offsets, &chunks)),
249 )
250 }
251 }
252}
253
254impl FromIterator<ArrayRef> for Array<Chunked> {
255 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
256 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
257 let dtype = chunks
258 .first()
259 .map(|c| c.dtype().clone())
260 .vortex_expect("Cannot infer DType from an empty iterator");
261 Array::<Chunked>::try_new(chunks, dtype)
262 .vortex_expect("Failed to create chunked array from iterator")
263 }
264}
265
266#[cfg(test)]
267mod test {
268 use vortex_buffer::buffer;
269 use vortex_error::VortexResult;
270
271 use crate::IntoArray;
272 use crate::arrays::ChunkedArray;
273 use crate::arrays::PrimitiveArray;
274 use crate::arrays::chunked::ChunkedArrayExt;
275 use crate::assert_arrays_eq;
276 use crate::dtype::DType;
277 use crate::dtype::Nullability;
278 use crate::dtype::PType;
279 use crate::validity::Validity;
280
281 #[test]
282 fn test_rechunk_one_chunk() {
283 let chunked = ChunkedArray::try_new(
284 vec![buffer![0u64].into_array()],
285 DType::Primitive(PType::U64, Nullability::NonNullable),
286 )
287 .unwrap();
288
289 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
290
291 assert_arrays_eq!(chunked, rechunked);
292 }
293
294 #[test]
295 fn test_rechunk_two_chunks() {
296 let chunked = ChunkedArray::try_new(
297 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
298 DType::Primitive(PType::U64, Nullability::NonNullable),
299 )
300 .unwrap();
301
302 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
303
304 assert_eq!(rechunked.nchunks(), 1);
305 assert_arrays_eq!(chunked, rechunked);
306 }
307
308 #[test]
309 fn test_rechunk_tiny_target_chunks() {
310 let chunked = ChunkedArray::try_new(
311 vec![
312 buffer![0u64, 1, 2, 3].into_array(),
313 buffer![4u64, 5].into_array(),
314 ],
315 DType::Primitive(PType::U64, Nullability::NonNullable),
316 )
317 .unwrap();
318
319 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
320
321 assert_eq!(rechunked.nchunks(), 2);
322 assert!(rechunked.iter_chunks().all(|c| c.len() < 5));
323 assert_arrays_eq!(chunked, rechunked);
324 }
325
326 #[test]
327 fn test_rechunk_with_too_big_chunk() {
328 let chunked = ChunkedArray::try_new(
329 vec![
330 buffer![0u64, 1, 2].into_array(),
331 buffer![42_u64; 6].into_array(),
332 buffer![4u64, 5].into_array(),
333 buffer![6u64, 7].into_array(),
334 buffer![8u64, 9].into_array(),
335 ],
336 DType::Primitive(PType::U64, Nullability::NonNullable),
337 )
338 .unwrap();
339
340 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
341 assert_eq!(rechunked.nchunks(), 4);
344 assert_arrays_eq!(chunked, rechunked);
345 }
346
347 #[test]
348 fn test_empty_chunks_all_valid() -> VortexResult<()> {
349 let chunks = vec![
351 PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
352 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
354 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), ];
356
357 let chunked =
358 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
359
360 assert!(chunked.all_valid().unwrap());
362 assert!(!chunked.into_array().all_invalid().unwrap());
363
364 Ok(())
365 }
366
367 #[test]
368 fn test_empty_chunks_all_invalid() -> VortexResult<()> {
369 let chunks = vec![
371 PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
372 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
374 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
376
377 let chunked =
378 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
379
380 assert!(!chunked.all_valid().unwrap());
382 assert!(chunked.into_array().all_invalid().unwrap());
383
384 Ok(())
385 }
386
387 #[test]
388 fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
389 let chunks = vec![
391 PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
392 PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
394 PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), ];
396
397 let chunked =
398 ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
399
400 assert!(!chunked.all_valid().unwrap());
402 assert!(!chunked.into_array().all_invalid().unwrap());
403
404 Ok(())
405 }
406}