vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
6
7use futures_util::stream;
8use itertools::Itertools;
9use vortex_buffer::{Buffer, BufferMut};
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
12use vortex_mask::Mask;
13
14use crate::arrays::ChunkedVTable;
15use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
16use crate::search_sorted::{SearchSorted, SearchSortedSide};
17use crate::stats::{ArrayStats, StatsSetRef};
18use crate::stream::{ArrayStream, ArrayStreamAdapter};
19use crate::vtable::{ArrayVTable, ValidityVTable};
20use crate::{Array, ArrayRef, IntoArray};
21
22#[derive(Clone, Debug)]
23pub struct ChunkedArray {
24 dtype: DType,
25 len: usize,
26 chunk_offsets: Buffer<u64>,
27 chunks: Vec<ArrayRef>,
28 stats_set: ArrayStats,
29}
30
31impl ChunkedArray {
32 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
33 for chunk in &chunks {
34 if chunk.dtype() != &dtype {
35 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
36 }
37 }
38
39 Ok(Self::new_unchecked(chunks, dtype))
40 }
41
42 pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
43 let nchunks = chunks.len();
44
45 let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
46 unsafe { chunk_offsets.push_unchecked(0) }
47 let mut curr_offset = 0;
48 for c in &chunks {
49 curr_offset += c.len() as u64;
50 unsafe { chunk_offsets.push_unchecked(curr_offset) }
51 }
52 assert_eq!(chunk_offsets.len(), nchunks + 1);
53
54 Self {
55 dtype,
56 len: curr_offset.try_into().vortex_unwrap(),
57 chunk_offsets: chunk_offsets.freeze(),
58 chunks,
59 stats_set: Default::default(),
60 }
61 }
62
63 #[inline]
65 pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
66 if idx >= self.nchunks() {
67 vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
68 }
69 Ok(&self.chunks[idx])
70 }
71
72 pub fn nchunks(&self) -> usize {
73 self.chunks.len()
74 }
75
76 #[inline]
77 pub fn chunk_offsets(&self) -> &Buffer<u64> {
78 &self.chunk_offsets
79 }
80
81 pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
82 assert!(index <= self.len(), "Index out of bounds of the array");
83 let index = index as u64;
84
85 let index_chunk = self
88 .chunk_offsets()
89 .search_sorted(&index, SearchSortedSide::Right)
90 .to_ends_index(self.nchunks() + 1)
91 .saturating_sub(1);
92 let chunk_start = self.chunk_offsets()[index_chunk];
93
94 let index_in_chunk =
95 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
96 (index_chunk, index_in_chunk)
97 }
98
99 pub fn chunks(&self) -> &[ArrayRef] {
100 &self.chunks
101 }
102
103 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
104 self.chunks().iter().filter(|c| !c.is_empty())
105 }
106
107 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
108 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
109 }
110
111 pub fn array_stream(&self) -> impl ArrayStream + '_ {
112 ArrayStreamAdapter::new(
113 self.dtype().clone(),
114 stream::iter(self.chunks().iter().cloned().map(Ok)),
115 )
116 }
117
118 pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult<Self> {
119 let mut new_chunks = Vec::new();
120 let mut chunks_to_combine = Vec::new();
121 let mut new_chunk_n_bytes = 0;
122 let mut new_chunk_n_elements = 0;
123 for chunk in self.chunks() {
124 let n_bytes = chunk.nbytes();
125 let n_elements = chunk.len();
126
127 if (new_chunk_n_bytes + n_bytes > target_bytesize
128 || new_chunk_n_elements + n_elements > target_rowsize)
129 && !chunks_to_combine.is_empty()
130 {
131 new_chunks.push(
132 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
133 .to_canonical()?
134 .into_array(),
135 );
136
137 new_chunk_n_bytes = 0;
138 new_chunk_n_elements = 0;
139 chunks_to_combine = Vec::new();
140 }
141
142 if n_bytes > target_bytesize || n_elements > target_rowsize {
143 new_chunks.push(chunk.clone());
144 } else {
145 new_chunk_n_bytes += n_bytes;
146 new_chunk_n_elements += n_elements;
147 chunks_to_combine.push(chunk.clone());
148 }
149 }
150
151 if !chunks_to_combine.is_empty() {
152 new_chunks.push(
153 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
154 .to_canonical()?
155 .into_array(),
156 );
157 }
158
159 Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
160 }
161}
162
163impl FromIterator<ArrayRef> for ChunkedArray {
164 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
165 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
166 let dtype = chunks
167 .first()
168 .map(|c| c.dtype().clone())
169 .vortex_expect("Cannot infer DType from an empty iterator");
170 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
171 }
172}
173
174impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
175 fn len(array: &ChunkedArray) -> usize {
176 array.len
177 }
178
179 fn dtype(array: &ChunkedArray) -> &DType {
180 &array.dtype
181 }
182
183 fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
184 array.stats_set.to_ref(array.as_ref())
185 }
186}
187
188impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
189 fn is_valid(array: &ChunkedArray, index: usize) -> VortexResult<bool> {
190 if !array.dtype.is_nullable() {
191 return Ok(true);
192 }
193 let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
194 array.chunk(chunk)?.is_valid(offset_in_chunk)
195 }
196
197 fn all_valid(array: &ChunkedArray) -> VortexResult<bool> {
198 if !array.dtype().is_nullable() {
199 return Ok(true);
200 }
201 for chunk in array.chunks() {
202 if !chunk.all_valid()? {
203 return Ok(false);
204 }
205 }
206 Ok(true)
207 }
208
209 fn all_invalid(array: &ChunkedArray) -> VortexResult<bool> {
210 if !array.dtype().is_nullable() {
211 return Ok(false);
212 }
213 for chunk in array.chunks() {
214 if !chunk.all_invalid()? {
215 return Ok(false);
216 }
217 }
218 Ok(true)
219 }
220
221 fn validity_mask(array: &ChunkedArray) -> VortexResult<Mask> {
222 array
223 .chunks()
224 .iter()
225 .map(|a| a.validity_mask())
226 .try_collect()
227 }
228}
229
230#[cfg(test)]
231mod test {
232 use vortex_buffer::buffer;
233 use vortex_dtype::{DType, Nullability, PType};
234 use vortex_error::VortexResult;
235
236 use crate::array::Array;
237 use crate::arrays::ChunkedVTable;
238 use crate::arrays::chunked::ChunkedArray;
239 use crate::compute::conformance::binary_numeric::test_numeric;
240 use crate::compute::{cast, sub_scalar};
241 use crate::{IntoArray, ToCanonical, assert_arrays_eq};
242
243 fn chunked_array() -> ChunkedArray {
244 ChunkedArray::try_new(
245 vec![
246 buffer![1u64, 2, 3].into_array(),
247 buffer![4u64, 5, 6].into_array(),
248 buffer![7u64, 8, 9].into_array(),
249 ],
250 DType::Primitive(PType::U64, Nullability::NonNullable),
251 )
252 .unwrap()
253 }
254
255 #[test]
256 fn test_scalar_subtract() {
257 let chunked = chunked_array().into_array();
258 let to_subtract = 1u64;
259 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
260
261 let chunked = array.as_::<ChunkedVTable>();
262 let chunks_out = chunked.chunks();
263
264 let results = chunks_out[0]
265 .to_primitive()
266 .unwrap()
267 .as_slice::<u64>()
268 .to_vec();
269 assert_eq!(results, &[0u64, 1, 2]);
270 let results = chunks_out[1]
271 .to_primitive()
272 .unwrap()
273 .as_slice::<u64>()
274 .to_vec();
275 assert_eq!(results, &[3u64, 4, 5]);
276 let results = chunks_out[2]
277 .to_primitive()
278 .unwrap()
279 .as_slice::<u64>()
280 .to_vec();
281 assert_eq!(results, &[6u64, 7, 8]);
282 }
283
284 #[test]
285 fn test_rechunk_one_chunk() {
286 let chunked = ChunkedArray::try_new(
287 vec![buffer![0u64].into_array()],
288 DType::Primitive(PType::U64, Nullability::NonNullable),
289 )
290 .unwrap();
291
292 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
293
294 assert_arrays_eq!(chunked, rechunked);
295 }
296
297 #[test]
298 fn test_rechunk_two_chunks() {
299 let chunked = ChunkedArray::try_new(
300 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
301 DType::Primitive(PType::U64, Nullability::NonNullable),
302 )
303 .unwrap();
304
305 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
306
307 assert_eq!(rechunked.nchunks(), 1);
308 assert_arrays_eq!(chunked, rechunked);
309 }
310
311 #[test]
312 fn test_rechunk_tiny_target_chunks() {
313 let chunked = ChunkedArray::try_new(
314 vec![
315 buffer![0u64, 1, 2, 3].into_array(),
316 buffer![4u64, 5].into_array(),
317 ],
318 DType::Primitive(PType::U64, Nullability::NonNullable),
319 )
320 .unwrap();
321
322 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
323
324 assert_eq!(rechunked.nchunks(), 2);
325 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
326 assert_arrays_eq!(chunked, rechunked);
327 }
328
329 #[test]
330 fn test_rechunk_with_too_big_chunk() {
331 let chunked = ChunkedArray::try_new(
332 vec![
333 buffer![0u64, 1, 2].into_array(),
334 buffer![42_u64; 6].into_array(),
335 buffer![4u64, 5].into_array(),
336 buffer![6u64, 7].into_array(),
337 buffer![8u64, 9].into_array(),
338 ],
339 DType::Primitive(PType::U64, Nullability::NonNullable),
340 )
341 .unwrap();
342
343 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
344 assert_eq!(rechunked.nchunks(), 4);
347 assert_arrays_eq!(chunked, rechunked);
348 }
349
350 #[test]
351 fn test_chunked_binary_numeric() {
352 let array = chunked_array();
353 let signed_dtype = DType::from(PType::try_from(array.dtype()).unwrap().to_signed());
355 let array = cast(array.as_ref(), &signed_dtype).unwrap();
356 test_numeric::<u64>(array)
357 }
358}