vortex_array/arrays/chunked/
mod.rs1use std::fmt::Debug;
6
7use futures_util::stream;
8use itertools::Itertools;
9use vortex_buffer::{Buffer, BufferMut};
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
12use vortex_mask::Mask;
13
14use crate::array::ArrayValidityImpl;
15use crate::compute::{SearchSorted, SearchSortedSide};
16use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
17use crate::nbytes::NBytes;
18use crate::stats::{ArrayStats, StatsSetRef};
19use crate::stream::{ArrayStream, ArrayStreamAdapter};
20use crate::vtable::VTableRef;
21use crate::{Array, ArrayImpl, ArrayRef, ArrayStatisticsImpl, EmptyMetadata, Encoding, IntoArray};
22
23mod canonical;
24mod compute;
25mod serde;
26mod variants;
27
28#[derive(Clone, Debug)]
29pub struct ChunkedArray {
30 dtype: DType,
31 len: usize,
32 chunk_offsets: Buffer<u64>,
33 chunks: Vec<ArrayRef>,
34 stats_set: ArrayStats,
35}
36
37pub struct ChunkedEncoding;
38impl Encoding for ChunkedEncoding {
39 type Array = ChunkedArray;
40 type Metadata = EmptyMetadata;
41}
42
43impl ChunkedArray {
44 pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
45 for chunk in &chunks {
46 if chunk.dtype() != &dtype {
47 vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
48 }
49 }
50
51 Ok(Self::new_unchecked(chunks, dtype))
52 }
53
54 pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
55 let nchunks = chunks.len();
56
57 let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
58 unsafe { chunk_offsets.push_unchecked(0) }
59 let mut curr_offset = 0;
60 for c in &chunks {
61 curr_offset += c.len() as u64;
62 unsafe { chunk_offsets.push_unchecked(curr_offset) }
63 }
64 assert_eq!(chunk_offsets.len(), nchunks + 1);
65
66 Self {
67 dtype,
68 len: curr_offset.try_into().vortex_unwrap(),
69 chunk_offsets: chunk_offsets.freeze(),
70 chunks,
71 stats_set: Default::default(),
72 }
73 }
74
75 #[inline]
77 pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
78 if idx >= self.nchunks() {
79 vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
80 }
81 Ok(&self.chunks[idx])
82 }
83
84 pub fn nchunks(&self) -> usize {
85 self.chunks.len()
86 }
87
88 #[inline]
89 pub fn chunk_offsets(&self) -> &[u64] {
90 &self.chunk_offsets
91 }
92
93 fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
94 assert!(index <= self.len(), "Index out of bounds of the array");
95 let index = index as u64;
96
97 let index_chunk = self
100 .chunk_offsets()
101 .search_sorted(&index, SearchSortedSide::Right)
102 .to_ends_index(self.nchunks() + 1)
103 .saturating_sub(1);
104 let chunk_start = self.chunk_offsets()[index_chunk];
105
106 let index_in_chunk =
107 usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
108 (index_chunk, index_in_chunk)
109 }
110
111 pub fn chunks(&self) -> &[ArrayRef] {
112 &self.chunks
113 }
114
115 pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
116 self.chunks().iter().filter(|c| !c.is_empty())
117 }
118
119 pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
120 ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
121 }
122
123 pub fn array_stream(&self) -> impl ArrayStream + '_ {
124 ArrayStreamAdapter::new(
125 self.dtype().clone(),
126 stream::iter(self.chunks().iter().cloned().map(Ok)),
127 )
128 }
129
130 pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult<Self> {
131 let mut new_chunks = Vec::new();
132 let mut chunks_to_combine = Vec::new();
133 let mut new_chunk_n_bytes = 0;
134 let mut new_chunk_n_elements = 0;
135 for chunk in self.chunks() {
136 let n_bytes = chunk.nbytes();
137 let n_elements = chunk.len();
138
139 if (new_chunk_n_bytes + n_bytes > target_bytesize
140 || new_chunk_n_elements + n_elements > target_rowsize)
141 && !chunks_to_combine.is_empty()
142 {
143 new_chunks.push(
144 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
145 .to_canonical()?
146 .into_array(),
147 );
148
149 new_chunk_n_bytes = 0;
150 new_chunk_n_elements = 0;
151 chunks_to_combine = Vec::new();
152 }
153
154 if n_bytes > target_bytesize || n_elements > target_rowsize {
155 new_chunks.push(chunk.clone());
156 } else {
157 new_chunk_n_bytes += n_bytes;
158 new_chunk_n_elements += n_elements;
159 chunks_to_combine.push(chunk.clone());
160 }
161 }
162
163 if !chunks_to_combine.is_empty() {
164 new_chunks.push(
165 ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
166 .to_canonical()?
167 .into_array(),
168 );
169 }
170
171 Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
172 }
173}
174
175impl FromIterator<ArrayRef> for ChunkedArray {
176 fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
177 let chunks: Vec<ArrayRef> = iter.into_iter().collect();
178 let dtype = chunks
179 .first()
180 .map(|c| c.dtype().clone())
181 .vortex_expect("Cannot infer DType from an empty iterator");
182 Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
183 }
184}
185
186impl ArrayImpl for ChunkedArray {
187 type Encoding = ChunkedEncoding;
188
189 fn _len(&self) -> usize {
190 self.len
191 }
192
193 fn _dtype(&self) -> &DType {
194 &self.dtype
195 }
196
197 fn _vtable(&self) -> VTableRef {
198 VTableRef::new_ref(&ChunkedEncoding)
199 }
200
201 fn _with_children(&self, children: &[ArrayRef]) -> VortexResult<Self> {
202 Ok(ChunkedArray::new_unchecked(
203 children[1..].to_vec(),
205 self.dtype.clone(),
206 ))
207 }
208}
209
210impl ArrayStatisticsImpl for ChunkedArray {
211 fn _stats_ref(&self) -> StatsSetRef<'_> {
212 self.stats_set.to_ref(self)
213 }
214}
215
216impl ArrayValidityImpl for ChunkedArray {
217 fn _is_valid(&self, index: usize) -> VortexResult<bool> {
218 if !self.dtype.is_nullable() {
219 return Ok(true);
220 }
221 let (chunk, offset_in_chunk) = self.find_chunk_idx(index);
222 self.chunk(chunk)?.is_valid(offset_in_chunk)
223 }
224
225 fn _all_valid(&self) -> VortexResult<bool> {
226 if !self.dtype().is_nullable() {
227 return Ok(true);
228 }
229 for chunk in self.chunks() {
230 if !chunk.all_valid()? {
231 return Ok(false);
232 }
233 }
234 Ok(true)
235 }
236
237 fn _all_invalid(&self) -> VortexResult<bool> {
238 if !self.dtype().is_nullable() {
239 return Ok(false);
240 }
241 for chunk in self.chunks() {
242 if !chunk.all_invalid()? {
243 return Ok(false);
244 }
245 }
246 Ok(true)
247 }
248
249 fn _validity_mask(&self) -> VortexResult<Mask> {
250 self.chunks()
251 .iter()
252 .map(|a| a.validity_mask())
253 .try_collect()
254 }
255}
256
257#[cfg(test)]
258mod test {
259 use vortex_buffer::buffer;
260 use vortex_dtype::{DType, Nullability, PType};
261 use vortex_error::VortexResult;
262
263 use crate::array::Array;
264 use crate::arrays::chunked::ChunkedArray;
265 use crate::compute::conformance::binary_numeric::test_binary_numeric;
266 use crate::compute::{scalar_at, sub_scalar, try_cast};
267 use crate::{ArrayExt, IntoArray, ToCanonical, assert_arrays_eq};
268
269 fn chunked_array() -> ChunkedArray {
270 ChunkedArray::try_new(
271 vec![
272 buffer![1u64, 2, 3].into_array(),
273 buffer![4u64, 5, 6].into_array(),
274 buffer![7u64, 8, 9].into_array(),
275 ],
276 DType::Primitive(PType::U64, Nullability::NonNullable),
277 )
278 .unwrap()
279 }
280
281 #[test]
282 fn test_scalar_subtract() {
283 let chunked = chunked_array().into_array();
284 let to_subtract = 1u64;
285 let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
286
287 let chunked = array.as_::<ChunkedArray>();
288 let chunks_out = chunked.chunks();
289
290 let results = chunks_out[0]
291 .to_primitive()
292 .unwrap()
293 .as_slice::<u64>()
294 .to_vec();
295 assert_eq!(results, &[0u64, 1, 2]);
296 let results = chunks_out[1]
297 .to_primitive()
298 .unwrap()
299 .as_slice::<u64>()
300 .to_vec();
301 assert_eq!(results, &[3u64, 4, 5]);
302 let results = chunks_out[2]
303 .to_primitive()
304 .unwrap()
305 .as_slice::<u64>()
306 .to_vec();
307 assert_eq!(results, &[6u64, 7, 8]);
308 }
309
310 #[test]
311 fn test_rechunk_one_chunk() {
312 let chunked = ChunkedArray::try_new(
313 vec![buffer![0u64].into_array()],
314 DType::Primitive(PType::U64, Nullability::NonNullable),
315 )
316 .unwrap();
317
318 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
319
320 assert_arrays_eq!(chunked, rechunked);
321 }
322
323 #[test]
324 fn test_rechunk_two_chunks() {
325 let chunked = ChunkedArray::try_new(
326 vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
327 DType::Primitive(PType::U64, Nullability::NonNullable),
328 )
329 .unwrap();
330
331 let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
332
333 assert_eq!(rechunked.nchunks(), 1);
334 assert_arrays_eq!(chunked, rechunked);
335 }
336
337 #[test]
338 fn test_rechunk_tiny_target_chunks() {
339 let chunked = ChunkedArray::try_new(
340 vec![
341 buffer![0u64, 1, 2, 3].into_array(),
342 buffer![4u64, 5].into_array(),
343 ],
344 DType::Primitive(PType::U64, Nullability::NonNullable),
345 )
346 .unwrap();
347
348 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
349
350 assert_eq!(rechunked.nchunks(), 2);
351 assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
352 assert_arrays_eq!(chunked, rechunked);
353 }
354
355 #[test]
356 fn test_rechunk_with_too_big_chunk() {
357 let chunked = ChunkedArray::try_new(
358 vec![
359 buffer![0u64, 1, 2].into_array(),
360 buffer![42_u64; 6].into_array(),
361 buffer![4u64, 5].into_array(),
362 buffer![6u64, 7].into_array(),
363 buffer![8u64, 9].into_array(),
364 ],
365 DType::Primitive(PType::U64, Nullability::NonNullable),
366 )
367 .unwrap();
368
369 let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
370 assert_eq!(rechunked.nchunks(), 4);
373 assert_arrays_eq!(chunked, rechunked);
374 }
375
376 #[test]
377 fn test_chunked_binary_numeric() {
378 let array = chunked_array();
379 let signed_dtype = DType::from(PType::try_from(array.dtype()).unwrap().to_signed());
381 let array = try_cast(&array, &signed_dtype).unwrap();
382 test_binary_numeric::<u64>(array)
383 }
384}