vortex_array/arrays/chunked/
mod.rs1use std::fmt::Debug;
6
7use futures_util::stream;
8use itertools::Itertools;
9use vortex_buffer::{Buffer, BufferMut};
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
12use vortex_mask::Mask;
13
14use crate::array::ArrayValidityImpl;
15use crate::compute::{SearchSorted, SearchSortedSide};
16use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
17use crate::nbytes::NBytes;
18use crate::stats::{ArrayStats, StatsSetRef};
19use crate::stream::{ArrayStream, ArrayStreamAdapter};
20use crate::validity::Validity;
21use crate::vtable::{EncodingVTable, VTableRef};
22use crate::{
23 Array, ArrayImpl, ArrayRef, ArrayStatisticsImpl, EmptyMetadata, Encoding, EncodingId, IntoArray,
24};
25
26mod canonical;
27mod compute;
28mod serde;
29mod stats;
30mod variants;
31
32#[derive(Clone, Debug)]
33pub struct ChunkedArray {
34 dtype: DType,
35 len: usize,
36 chunk_offsets: Buffer<u64>,
37 chunks: Vec<ArrayRef>,
38 stats_set: ArrayStats,
39}
40
41pub struct ChunkedEncoding;
42impl Encoding for ChunkedEncoding {
43 type Array = ChunkedArray;
44 type Metadata = EmptyMetadata;
45}
46
47impl EncodingVTable for ChunkedEncoding {
48 fn id(&self) -> EncodingId {
49 EncodingId::new_ref("vortex.chunked")
50 }
51}
52
53impl ChunkedArray {
54 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
55 for chunk in &chunks {
56 if chunk.dtype() != &dtype {
57 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
58 }
59 }
60
61 Ok(Self::new_unchecked(chunks, dtype))
62 }
63
64 pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
65 let nchunks = chunks.len();
66
67 let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
68 unsafe { chunk_offsets.push_unchecked(0) }
69 let mut curr_offset = 0;
70 for c in &chunks {
71 curr_offset += c.len() as u64;
72 unsafe { chunk_offsets.push_unchecked(curr_offset) }
73 }
74 assert_eq!(chunk_offsets.len(), nchunks + 1);
75
76 Self {
77 dtype,
78 len: curr_offset.try_into().vortex_unwrap(),
79 chunk_offsets: chunk_offsets.freeze(),
80 chunks,
81 stats_set: Default::default(),
82 }
83 }
84
85 #[inline]
87 pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
88 if idx >= self.nchunks() {
89 vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
90 }
91 Ok(&self.chunks[idx])
92 }
93
94 pub fn nchunks(&self) -> usize {
95 self.chunks.len()
96 }
97
98 #[inline]
99 pub fn chunk_offsets(&self) -> &[u64] {
100 &self.chunk_offsets
101 }
102
103 fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
104 assert!(index <= self.len(), "Index out of bounds of the array");
105 let index = index as u64;
106
107 let index_chunk = self
110 .chunk_offsets()
111 .search_sorted(&index, SearchSortedSide::Right)
112 .to_ends_index(self.nchunks() + 1)
113 .saturating_sub(1);
114 let chunk_start = self.chunk_offsets()[index_chunk];
115
116 let index_in_chunk =
117 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
118 (index_chunk, index_in_chunk)
119 }
120
121 pub fn chunks(&self) -> &[ArrayRef] {
122 &self.chunks
123 }
124
125 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
126 self.chunks().iter().filter(|c| !c.is_empty())
127 }
128
129 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
130 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
131 }
132
133 pub fn array_stream(&self) -> impl ArrayStream + '_ {
134 ArrayStreamAdapter::new(
135 self.dtype().clone(),
136 stream::iter(self.chunks().iter().cloned().map(Ok)),
137 )
138 }
139
140 pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult<Self> {
141 let mut new_chunks = Vec::new();
142 let mut chunks_to_combine = Vec::new();
143 let mut new_chunk_n_bytes = 0;
144 let mut new_chunk_n_elements = 0;
145 for chunk in self.chunks() {
146 let n_bytes = chunk.nbytes();
147 let n_elements = chunk.len();
148
149 if (new_chunk_n_bytes + n_bytes > target_bytesize
150 || new_chunk_n_elements + n_elements > target_rowsize)
151 && !chunks_to_combine.is_empty()
152 {
153 new_chunks.push(
154 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
155 .to_canonical()?
156 .into_array(),
157 );
158
159 new_chunk_n_bytes = 0;
160 new_chunk_n_elements = 0;
161 chunks_to_combine = Vec::new();
162 }
163
164 if n_bytes > target_bytesize || n_elements > target_rowsize {
165 new_chunks.push(chunk.clone());
166 } else {
167 new_chunk_n_bytes += n_bytes;
168 new_chunk_n_elements += n_elements;
169 chunks_to_combine.push(chunk.clone());
170 }
171 }
172
173 if !chunks_to_combine.is_empty() {
174 new_chunks.push(
175 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
176 .to_canonical()?
177 .into_array(),
178 );
179 }
180
181 Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
182 }
183}
184
185impl FromIterator<ArrayRef> for ChunkedArray {
186 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
187 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
188 let dtype = chunks
189 .first()
190 .map(|c| c.dtype().clone())
191 .vortex_expect("Cannot infer DType from an empty iterator");
192 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
193 }
194}
195
196impl ArrayImpl for ChunkedArray {
197 type Encoding = ChunkedEncoding;
198
199 fn _len(&self) -> usize {
200 self.len
201 }
202
203 fn _dtype(&self) -> &DType {
204 &self.dtype
205 }
206
207 fn _vtable(&self) -> VTableRef {
208 VTableRef::new_ref(&ChunkedEncoding)
209 }
210}
211
212impl ArrayStatisticsImpl for ChunkedArray {
213 fn _stats_ref(&self) -> StatsSetRef<'_> {
214 self.stats_set.to_ref(self)
215 }
216}
217
218impl ArrayValidityImpl for ChunkedArray {
219 fn _is_valid(&self, index: usize) -> VortexResult<bool> {
220 if !self.dtype.is_nullable() {
221 return Ok(true);
222 }
223 let (chunk, offset_in_chunk) = self.find_chunk_idx(index);
224 self.chunk(chunk)?.is_valid(offset_in_chunk)
225 }
226
227 fn _all_valid(&self) -> VortexResult<bool> {
228 if !self.dtype().is_nullable() {
229 return Ok(true);
230 }
231 for chunk in self.chunks() {
232 if !chunk.all_valid()? {
233 return Ok(false);
234 }
235 }
236 Ok(true)
237 }
238
239 fn _all_invalid(&self) -> VortexResult<bool> {
240 if !self.dtype().is_nullable() {
241 return Ok(false);
242 }
243 for chunk in self.chunks() {
244 if !chunk.all_invalid()? {
245 return Ok(false);
246 }
247 }
248 Ok(true)
249 }
250
251 fn _validity_mask(&self) -> VortexResult<Mask> {
252 let validity: Validity = self
255 .chunks()
256 .iter()
257 .map(|a| a.validity_mask())
258 .try_collect()?;
259 validity.to_logical(self.len())
260 }
261}
262
263#[cfg(test)]
264mod test {
265 use vortex_buffer::buffer;
266 use vortex_dtype::{DType, Nullability, PType};
267 use vortex_error::VortexResult;
268
269 use crate::array::Array;
270 use crate::arrays::chunked::ChunkedArray;
271 use crate::compute::test_harness::test_binary_numeric;
272 use crate::compute::{scalar_at, sub_scalar, try_cast};
273 use crate::{ArrayExt, IntoArray, ToCanonical, assert_arrays_eq};
274
275 fn chunked_array() -> ChunkedArray {
276 ChunkedArray::try_new(
277 vec![
278 buffer![1u64, 2, 3].into_array(),
279 buffer![4u64, 5, 6].into_array(),
280 buffer![7u64, 8, 9].into_array(),
281 ],
282 DType::Primitive(PType::U64, Nullability::NonNullable),
283 )
284 .unwrap()
285 }
286
287 #[test]
288 fn test_scalar_subtract() {
289 let chunked = chunked_array().into_array();
290 let to_subtract = 1u64;
291 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
292
293 let chunked = array.as_::<ChunkedArray>();
294 let chunks_out = chunked.chunks();
295
296 let results = chunks_out[0]
297 .to_primitive()
298 .unwrap()
299 .as_slice::<u64>()
300 .to_vec();
301 assert_eq!(results, &[0u64, 1, 2]);
302 let results = chunks_out[1]
303 .to_primitive()
304 .unwrap()
305 .as_slice::<u64>()
306 .to_vec();
307 assert_eq!(results, &[3u64, 4, 5]);
308 let results = chunks_out[2]
309 .to_primitive()
310 .unwrap()
311 .as_slice::<u64>()
312 .to_vec();
313 assert_eq!(results, &[6u64, 7, 8]);
314 }
315
316 #[test]
317 fn test_rechunk_one_chunk() {
318 let chunked = ChunkedArray::try_new(
319 vec![buffer![0u64].into_array()],
320 DType::Primitive(PType::U64, Nullability::NonNullable),
321 )
322 .unwrap();
323
324 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
325
326 assert_arrays_eq!(chunked, rechunked);
327 }
328
329 #[test]
330 fn test_rechunk_two_chunks() {
331 let chunked = ChunkedArray::try_new(
332 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
333 DType::Primitive(PType::U64, Nullability::NonNullable),
334 )
335 .unwrap();
336
337 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
338
339 assert_eq!(rechunked.nchunks(), 1);
340 assert_arrays_eq!(chunked, rechunked);
341 }
342
343 #[test]
344 fn test_rechunk_tiny_target_chunks() {
345 let chunked = ChunkedArray::try_new(
346 vec![
347 buffer![0u64, 1, 2, 3].into_array(),
348 buffer![4u64, 5].into_array(),
349 ],
350 DType::Primitive(PType::U64, Nullability::NonNullable),
351 )
352 .unwrap();
353
354 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
355
356 assert_eq!(rechunked.nchunks(), 2);
357 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
358 assert_arrays_eq!(chunked, rechunked);
359 }
360
361 #[test]
362 fn test_rechunk_with_too_big_chunk() {
363 let chunked = ChunkedArray::try_new(
364 vec![
365 buffer![0u64, 1, 2].into_array(),
366 buffer![42_u64; 6].into_array(),
367 buffer![4u64, 5].into_array(),
368 buffer![6u64, 7].into_array(),
369 buffer![8u64, 9].into_array(),
370 ],
371 DType::Primitive(PType::U64, Nullability::NonNullable),
372 )
373 .unwrap();
374
375 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
376 assert_eq!(rechunked.nchunks(), 4);
379 assert_arrays_eq!(chunked, rechunked);
380 }
381
382 #[test]
383 fn test_chunked_binary_numeric() {
384 let array = chunked_array();
385 let signed_dtype = DType::from(PType::try_from(array.dtype()).unwrap().to_signed());
387 let array = try_cast(&array, &signed_dtype).unwrap();
388 test_binary_numeric::<u64>(array)
389 }
390}