vortex_array/arrays/chunked/
canonical.rs1use arrow_buffer::BooleanBufferBuilder;
2use vortex_buffer::BufferMut;
3use vortex_dtype::{DType, NativePType, Nullability, PType, StructDType, match_each_native_ptype};
4use vortex_error::{VortexExpect, VortexResult, vortex_err};
5
6use crate::array::ArrayCanonicalImpl;
7use crate::arrays::chunked::ChunkedArray;
8use crate::arrays::extension::ExtensionArray;
9use crate::arrays::null::NullArray;
10use crate::arrays::primitive::PrimitiveArray;
11use crate::arrays::struct_::StructArray;
12use crate::arrays::{BoolArray, ListArray, VarBinViewArray};
13use crate::builders::ArrayBuilder;
14use crate::compute::{scalar_at, slice, try_cast};
15use crate::validity::Validity;
16use crate::{Array, ArrayRef, ArrayVariants, Canonical, ToCanonical};
17
18impl ArrayCanonicalImpl for ChunkedArray {
19 fn _to_canonical(&self) -> VortexResult<Canonical> {
20 let validity = Validity::copy_from_array(self)?;
21 try_canonicalize_chunks(self.chunks(), validity, self.dtype())
22 }
23
24 fn _append_to_builder(&self, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
25 for chunk in self.chunks() {
26 chunk.append_to_builder(builder)?;
27 }
28 Ok(())
29 }
30}
31
32pub(crate) fn try_canonicalize_chunks(
33 chunks: &[ArrayRef],
34 validity: Validity,
35 dtype: &DType,
36) -> VortexResult<Canonical> {
37 match dtype {
38 DType::Struct(struct_dtype, _) => {
41 let struct_array = swizzle_struct_chunks(chunks, validity, struct_dtype)?;
42 Ok(Canonical::Struct(struct_array))
43 }
44
45 DType::Extension(ext_dtype) => {
70 let storage_chunks: Vec<ArrayRef> = chunks
73 .iter()
74 .map(|chunk| {
77 chunk
78 .clone()
79 .as_extension_typed()
80 .vortex_expect("Chunk could not be downcast to ExtensionArrayTrait")
81 .storage_data()
82 })
83 .collect();
84 let storage_dtype = ext_dtype.storage_dtype().clone();
85 let chunked_storage =
86 ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array();
87
88 Ok(Canonical::Extension(ExtensionArray::new(
89 ext_dtype.clone(),
90 chunked_storage,
91 )))
92 }
93
94 DType::List(..) => {
95 let list = pack_lists(chunks, validity, dtype)?;
98 Ok(Canonical::List(list))
99 }
100
101 DType::Bool(_) => {
102 let bool_array = pack_bools(chunks, validity)?;
103 Ok(Canonical::Bool(bool_array))
104 }
105 DType::Primitive(ptype, _) => {
106 match_each_native_ptype!(ptype, |$P| {
107 let prim_array = pack_primitives::<$P>(chunks, validity)?;
108 Ok(Canonical::Primitive(prim_array))
109 })
110 }
111 DType::Utf8(_) => {
112 let varbin_array = pack_views(chunks, dtype, validity)?;
113 Ok(Canonical::VarBinView(varbin_array))
114 }
115 DType::Binary(_) => {
116 let varbin_array = pack_views(chunks, dtype, validity)?;
117 Ok(Canonical::VarBinView(varbin_array))
118 }
119 DType::Null => {
120 let len = chunks.iter().map(|chunk| chunk.len()).sum();
121 let null_array = NullArray::new(len);
122 Ok(Canonical::Null(null_array))
123 }
124 }
125}
126
127fn pack_lists(chunks: &[ArrayRef], validity: Validity, dtype: &DType) -> VortexResult<ListArray> {
128 let len: usize = chunks.iter().map(|c| c.len()).sum();
129 let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
130 offsets.push(0);
131 let mut elements = Vec::new();
132 let elem_dtype = dtype
133 .as_list_element()
134 .vortex_expect("ListArray must have List dtype");
135
136 for chunk in chunks {
137 let chunk = chunk.to_list()?;
138 let offsets_arr = try_cast(
140 chunk.offsets(),
141 &DType::Primitive(PType::I64, Nullability::NonNullable),
142 )?
143 .to_primitive()?;
144
145 let first_offset_value: usize = usize::try_from(&scalar_at(&offsets_arr, 0)?)?;
146 let last_offset_value: usize =
147 usize::try_from(&scalar_at(&offsets_arr, offsets_arr.len() - 1)?)?;
148 elements.push(slice(
149 chunk.elements(),
150 first_offset_value,
151 last_offset_value,
152 )?);
153
154 let adjustment_from_previous = *offsets
155 .last()
156 .ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
157 offsets.extend(
158 offsets_arr
159 .as_slice::<i64>()
160 .iter()
161 .skip(1)
162 .map(|off| off + adjustment_from_previous - first_offset_value as i64),
163 );
164 }
165 let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
166 let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
167
168 ListArray::try_new(chunked_elements, offsets.into_array(), validity)
169}
170
171fn swizzle_struct_chunks(
177 chunks: &[ArrayRef],
178 validity: Validity,
179 struct_dtype: &StructDType,
180) -> VortexResult<StructArray> {
181 let len = chunks.iter().map(|chunk| chunk.len()).sum();
182 let mut field_arrays = Vec::new();
183
184 for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
185 let field_chunks = chunks
186 .iter()
187 .map(|c| {
188 c.as_struct_typed()
189 .vortex_expect("Chunk was not a StructArray")
190 .maybe_null_field_by_idx(field_idx)
191 .vortex_expect("Invalid chunked array")
192 })
193 .collect::<Vec<_>>();
194 let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
195 field_arrays.push(field_array.into_array());
196 }
197
198 StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity)
199}
200
201fn pack_bools(chunks: &[ArrayRef], validity: Validity) -> VortexResult<BoolArray> {
206 let len = chunks.iter().map(|chunk| chunk.len()).sum();
207 let mut buffer = BooleanBufferBuilder::new(len);
208 for chunk in chunks {
209 let chunk = chunk.to_bool()?;
210 buffer.append_buffer(chunk.boolean_buffer());
211 }
212 Ok(BoolArray::new(buffer.finish(), validity))
213}
214
215fn pack_primitives<T: NativePType>(
221 chunks: &[ArrayRef],
222 validity: Validity,
223) -> VortexResult<PrimitiveArray> {
224 let total_len = chunks.iter().map(|a| a.len()).sum();
225 let mut buffer = BufferMut::with_capacity(total_len);
226 for chunk in chunks {
227 let chunk = chunk.to_primitive()?;
228 buffer.extend_from_slice(chunk.as_slice::<T>());
229 }
230 Ok(PrimitiveArray::new(buffer.freeze(), validity))
231}
232
233fn pack_views(
239 chunks: &[ArrayRef],
240 dtype: &DType,
241 validity: Validity,
242) -> VortexResult<VarBinViewArray> {
243 let total_len = chunks.iter().map(|a| a.len()).sum();
244 let mut views = BufferMut::with_capacity(total_len);
245 let mut buffers = Vec::new();
246 for chunk in chunks {
247 let buffers_offset = u32::try_from(buffers.len())?;
248 let canonical_chunk = chunk.to_varbinview()?;
249 buffers.extend(canonical_chunk.buffers().iter().cloned());
250
251 views.extend(
252 canonical_chunk
253 .views()
254 .iter()
255 .map(|view| view.offset_view(buffers_offset)),
256 );
257 }
258
259 VarBinViewArray::try_new(views.freeze(), buffers, dtype.clone(), validity)
260}
261
262#[cfg(test)]
263mod tests {
264 use std::sync::Arc;
265
266 use vortex_dtype::DType;
267 use vortex_dtype::DType::{List, Primitive};
268 use vortex_dtype::Nullability::NonNullable;
269 use vortex_dtype::PType::I32;
270
271 use crate::ToCanonical;
272 use crate::accessor::ArrayAccessor;
273 use crate::array::Array;
274 use crate::arrays::chunked::canonical::pack_views;
275 use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
276 use crate::compute::{scalar_at, slice};
277 use crate::validity::Validity;
278 use crate::variants::StructArrayTrait;
279
280 fn stringview_array() -> VarBinViewArray {
281 VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"])
282 }
283
284 #[test]
285 pub fn pack_sliced_varbin() {
286 let array1 = slice(&stringview_array(), 1, 3).unwrap();
287 let array2 = slice(&stringview_array(), 2, 4).unwrap();
288 let packed = pack_views(
289 &[array1, array2],
290 &DType::Utf8(NonNullable),
291 Validity::NonNullable,
292 )
293 .unwrap();
294 assert_eq!(packed.len(), 4);
295 let values = packed
296 .with_iterator(|iter| {
297 iter.flatten()
298 .map(|v| unsafe { String::from_utf8_unchecked(v.to_vec()) })
299 .collect::<Vec<_>>()
300 })
301 .unwrap();
302 assert_eq!(values, &["bar", "baz", "baz", "quak"]);
303 }
304
305 #[test]
306 pub fn pack_nested_structs() {
307 let struct_array = StructArray::try_new(
308 vec!["a".into()].into(),
309 vec![stringview_array().into_array()],
310 4,
311 Validity::NonNullable,
312 )
313 .unwrap();
314 let dtype = struct_array.dtype().clone();
315 let chunked = ChunkedArray::try_new(
316 vec![
317 ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
318 .unwrap()
319 .into_array(),
320 ],
321 dtype,
322 )
323 .unwrap()
324 .into_array();
325 let canonical_struct = chunked.to_struct().unwrap();
326 let canonical_varbin = canonical_struct
327 .maybe_null_field_by_idx(0)
328 .unwrap()
329 .to_varbinview()
330 .unwrap();
331 let original_varbin = struct_array
332 .maybe_null_field_by_idx(0)
333 .unwrap()
334 .to_varbinview()
335 .unwrap();
336 let orig_values = original_varbin
337 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
338 .unwrap();
339 let canon_values = canonical_varbin
340 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
341 .unwrap();
342 assert_eq!(orig_values, canon_values);
343 }
344
345 #[test]
346 pub fn pack_nested_lists() {
347 let l1 = ListArray::try_new(
348 PrimitiveArray::from_iter([1, 2, 3, 4]).into_array(),
349 PrimitiveArray::from_iter([0, 3]).into_array(),
350 Validity::NonNullable,
351 )
352 .unwrap();
353
354 let l2 = ListArray::try_new(
355 PrimitiveArray::from_iter([5, 6]).into_array(),
356 PrimitiveArray::from_iter([0, 2]).into_array(),
357 Validity::NonNullable,
358 )
359 .unwrap();
360
361 let chunked_list = ChunkedArray::try_new(
362 vec![l1.clone().into_array(), l2.clone().into_array()],
363 List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
364 );
365
366 let canon_values = chunked_list.unwrap().to_list().unwrap();
367
368 assert_eq!(
369 scalar_at(&l1, 0).unwrap(),
370 scalar_at(&canon_values, 0).unwrap()
371 );
372 assert_eq!(
373 scalar_at(&l2, 0).unwrap(),
374 scalar_at(&canon_values, 1).unwrap()
375 );
376 }
377}