vortex_array/arrays/chunked/
array.rs1use std::fmt::Debug;
9
10use futures_util::stream;
11use itertools::Itertools;
12use vortex_buffer::{Buffer, BufferMut};
13use vortex_dtype::DType;
14use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
15use vortex_mask::Mask;
16
17use crate::arrays::ChunkedVTable;
18use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
19use crate::search_sorted::{SearchSorted, SearchSortedSide};
20use crate::stats::{ArrayStats, StatsSetRef};
21use crate::stream::{ArrayStream, ArrayStreamAdapter};
22use crate::vtable::{ArrayVTable, ValidityVTable};
23use crate::{Array, ArrayRef, IntoArray};
24
25#[derive(Clone, Debug)]
26pub struct ChunkedArray {
27 dtype: DType,
28 len: usize,
29 chunk_offsets: Buffer<u64>,
30 chunks: Vec<ArrayRef>,
31 stats_set: ArrayStats,
32}
33
34impl ChunkedArray {
35 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
36 for chunk in &chunks {
37 if chunk.dtype() != &dtype {
38 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
39 }
40 }
41
42 Ok(Self::new_unchecked(chunks, dtype))
43 }
44
45 pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
46 let nchunks = chunks.len();
47
48 let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
49 unsafe { chunk_offsets.push_unchecked(0) }
50 let mut curr_offset = 0;
51 for c in &chunks {
52 curr_offset += c.len() as u64;
53 unsafe { chunk_offsets.push_unchecked(curr_offset) }
54 }
55 assert_eq!(chunk_offsets.len(), nchunks + 1);
56
57 Self {
58 dtype,
59 len: curr_offset.try_into().vortex_unwrap(),
60 chunk_offsets: chunk_offsets.freeze(),
61 chunks,
62 stats_set: Default::default(),
63 }
64 }
65
66 #[inline]
68 pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
69 if idx >= self.nchunks() {
70 vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
71 }
72 Ok(&self.chunks[idx])
73 }
74
75 pub fn nchunks(&self) -> usize {
76 self.chunks.len()
77 }
78
79 #[inline]
80 pub fn chunk_offsets(&self) -> &Buffer<u64> {
81 &self.chunk_offsets
82 }
83
84 pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
85 assert!(index <= self.len(), "Index out of bounds of the array");
86 let index = index as u64;
87
88 let index_chunk = self
91 .chunk_offsets()
92 .search_sorted(&index, SearchSortedSide::Right)
93 .to_ends_index(self.nchunks() + 1)
94 .saturating_sub(1);
95 let chunk_start = self.chunk_offsets()[index_chunk];
96
97 let index_in_chunk =
98 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
99 (index_chunk, index_in_chunk)
100 }
101
102 pub fn chunks(&self) -> &[ArrayRef] {
103 &self.chunks
104 }
105
106 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
107 self.chunks().iter().filter(|c| !c.is_empty())
108 }
109
110 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
111 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
112 }
113
114 pub fn array_stream(&self) -> impl ArrayStream + '_ {
115 ArrayStreamAdapter::new(
116 self.dtype().clone(),
117 stream::iter(self.chunks().iter().cloned().map(Ok)),
118 )
119 }
120
121 pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
122 let mut new_chunks = Vec::new();
123 let mut chunks_to_combine = Vec::new();
124 let mut new_chunk_n_bytes = 0;
125 let mut new_chunk_n_elements = 0;
126 for chunk in self.chunks() {
127 let n_bytes = chunk.nbytes();
128 let n_elements = chunk.len();
129
130 if (new_chunk_n_bytes + n_bytes > target_bytesize
131 || new_chunk_n_elements + n_elements > target_rowsize)
132 && !chunks_to_combine.is_empty()
133 {
134 new_chunks.push(
135 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
136 .to_canonical()?
137 .into_array(),
138 );
139
140 new_chunk_n_bytes = 0;
141 new_chunk_n_elements = 0;
142 chunks_to_combine = Vec::new();
143 }
144
145 if n_bytes > target_bytesize || n_elements > target_rowsize {
146 new_chunks.push(chunk.clone());
147 } else {
148 new_chunk_n_bytes += n_bytes;
149 new_chunk_n_elements += n_elements;
150 chunks_to_combine.push(chunk.clone());
151 }
152 }
153
154 if !chunks_to_combine.is_empty() {
155 new_chunks.push(
156 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
157 .to_canonical()?
158 .into_array(),
159 );
160 }
161
162 Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
163 }
164}
165
166impl FromIterator<ArrayRef> for ChunkedArray {
167 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
168 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
169 let dtype = chunks
170 .first()
171 .map(|c| c.dtype().clone())
172 .vortex_expect("Cannot infer DType from an empty iterator");
173 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
174 }
175}
176
177impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
178 fn len(array: &ChunkedArray) -> usize {
179 array.len
180 }
181
182 fn dtype(array: &ChunkedArray) -> &DType {
183 &array.dtype
184 }
185
186 fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
187 array.stats_set.to_ref(array.as_ref())
188 }
189}
190
191impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
192 fn is_valid(array: &ChunkedArray, index: usize) -> VortexResult<bool> {
193 if !array.dtype.is_nullable() {
194 return Ok(true);
195 }
196 let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
197 array.chunk(chunk)?.is_valid(offset_in_chunk)
198 }
199
200 fn all_valid(array: &ChunkedArray) -> VortexResult<bool> {
201 if !array.dtype().is_nullable() {
202 return Ok(true);
203 }
204 for chunk in array.chunks() {
205 if !chunk.all_valid()? {
206 return Ok(false);
207 }
208 }
209 Ok(true)
210 }
211
212 fn all_invalid(array: &ChunkedArray) -> VortexResult<bool> {
213 if !array.dtype().is_nullable() {
214 return Ok(false);
215 }
216 for chunk in array.chunks() {
217 if !chunk.all_invalid()? {
218 return Ok(false);
219 }
220 }
221 Ok(true)
222 }
223
224 fn validity_mask(array: &ChunkedArray) -> VortexResult<Mask> {
225 array
226 .chunks()
227 .iter()
228 .map(|a| a.validity_mask())
229 .try_collect()
230 }
231}
232
233#[cfg(test)]
234mod test {
235 use vortex_buffer::buffer;
236 use vortex_dtype::{DType, Nullability, PType};
237 use vortex_error::VortexResult;
238
239 use crate::array::Array;
240 use crate::arrays::ChunkedVTable;
241 use crate::arrays::chunked::ChunkedArray;
242 use crate::compute::conformance::binary_numeric::test_numeric;
243 use crate::compute::{cast, sub_scalar};
244 use crate::{IntoArray, ToCanonical, assert_arrays_eq};
245
246 fn chunked_array() -> ChunkedArray {
247 ChunkedArray::try_new(
248 vec![
249 buffer![1u64, 2, 3].into_array(),
250 buffer![4u64, 5, 6].into_array(),
251 buffer![7u64, 8, 9].into_array(),
252 ],
253 DType::Primitive(PType::U64, Nullability::NonNullable),
254 )
255 .unwrap()
256 }
257
258 #[test]
259 fn test_scalar_subtract() {
260 let chunked = chunked_array().into_array();
261 let to_subtract = 1u64;
262 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
263
264 let chunked = array.as_::<ChunkedVTable>();
265 let chunks_out = chunked.chunks();
266
267 let results = chunks_out[0]
268 .to_primitive()
269 .unwrap()
270 .as_slice::<u64>()
271 .to_vec();
272 assert_eq!(results, &[0u64, 1, 2]);
273 let results = chunks_out[1]
274 .to_primitive()
275 .unwrap()
276 .as_slice::<u64>()
277 .to_vec();
278 assert_eq!(results, &[3u64, 4, 5]);
279 let results = chunks_out[2]
280 .to_primitive()
281 .unwrap()
282 .as_slice::<u64>()
283 .to_vec();
284 assert_eq!(results, &[6u64, 7, 8]);
285 }
286
287 #[test]
288 fn test_rechunk_one_chunk() {
289 let chunked = ChunkedArray::try_new(
290 vec![buffer![0u64].into_array()],
291 DType::Primitive(PType::U64, Nullability::NonNullable),
292 )
293 .unwrap();
294
295 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
296
297 assert_arrays_eq!(chunked, rechunked);
298 }
299
300 #[test]
301 fn test_rechunk_two_chunks() {
302 let chunked = ChunkedArray::try_new(
303 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
304 DType::Primitive(PType::U64, Nullability::NonNullable),
305 )
306 .unwrap();
307
308 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
309
310 assert_eq!(rechunked.nchunks(), 1);
311 assert_arrays_eq!(chunked, rechunked);
312 }
313
314 #[test]
315 fn test_rechunk_tiny_target_chunks() {
316 let chunked = ChunkedArray::try_new(
317 vec![
318 buffer![0u64, 1, 2, 3].into_array(),
319 buffer![4u64, 5].into_array(),
320 ],
321 DType::Primitive(PType::U64, Nullability::NonNullable),
322 )
323 .unwrap();
324
325 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
326
327 assert_eq!(rechunked.nchunks(), 2);
328 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
329 assert_arrays_eq!(chunked, rechunked);
330 }
331
332 #[test]
333 fn test_rechunk_with_too_big_chunk() {
334 let chunked = ChunkedArray::try_new(
335 vec![
336 buffer![0u64, 1, 2].into_array(),
337 buffer![42_u64; 6].into_array(),
338 buffer![4u64, 5].into_array(),
339 buffer![6u64, 7].into_array(),
340 buffer![8u64, 9].into_array(),
341 ],
342 DType::Primitive(PType::U64, Nullability::NonNullable),
343 )
344 .unwrap();
345
346 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
347 assert_eq!(rechunked.nchunks(), 4);
350 assert_arrays_eq!(chunked, rechunked);
351 }
352
353 #[test]
354 fn test_chunked_binary_numeric() {
355 let array = chunked_array();
356 let signed_dtype = DType::from(PType::try_from(array.dtype()).unwrap().to_signed());
358 let array = cast(array.as_ref(), &signed_dtype).unwrap();
359 test_numeric::<u64>(array)
360 }
361}