vortex_array/arrays/chunked/
canonical.rs

1use arrow_buffer::BooleanBufferBuilder;
2use vortex_buffer::BufferMut;
3use vortex_dtype::{DType, NativePType, Nullability, PType, StructDType, match_each_native_ptype};
4use vortex_error::{VortexExpect, VortexResult, vortex_err};
5
6use crate::array::ArrayCanonicalImpl;
7use crate::arrays::chunked::ChunkedArray;
8use crate::arrays::extension::ExtensionArray;
9use crate::arrays::null::NullArray;
10use crate::arrays::primitive::PrimitiveArray;
11use crate::arrays::struct_::StructArray;
12use crate::arrays::{BoolArray, ListArray, VarBinViewArray};
13use crate::builders::ArrayBuilder;
14use crate::compute::{scalar_at, slice, try_cast};
15use crate::validity::Validity;
16use crate::{Array, ArrayRef, ArrayVariants, Canonical, ToCanonical};
17
18impl ArrayCanonicalImpl for ChunkedArray {
19    fn _to_canonical(&self) -> VortexResult<Canonical> {
20        let validity = Validity::copy_from_array(self)?;
21        try_canonicalize_chunks(self.chunks(), validity, self.dtype())
22    }
23
24    fn _append_to_builder(&self, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
25        for chunk in self.chunks() {
26            chunk.append_to_builder(builder)?;
27        }
28        Ok(())
29    }
30}
31
32pub(crate) fn try_canonicalize_chunks(
33    chunks: &[ArrayRef],
34    validity: Validity,
35    dtype: &DType,
36) -> VortexResult<Canonical> {
37    match dtype {
38        // Structs can have their internal field pointers swizzled to push the chunking down
39        // one level internally without copying or decompressing any data.
40        DType::Struct(struct_dtype, _) => {
41            let struct_array = swizzle_struct_chunks(chunks, validity, struct_dtype)?;
42            Ok(Canonical::Struct(struct_array))
43        }
44
45        // Extension arrays are containers that wraps an inner storage array with some metadata.
46        // We delegate to the canonical format of the internal storage array for every chunk, and
47        // push the chunking down into the inner storage array.
48        //
49        //  Input:
50        //  ------
51        //
52        //                  ChunkedArray
53        //                 /            \
54        //                /              \
55        //         ExtensionArray     ExtensionArray
56        //             |                   |
57        //          storage             storage
58        //
59        //
60        //  Output:
61        //  ------
62        //
63        //                 ExtensionArray
64        //                      |
65        //                 ChunkedArray
66        //                /             \
67        //          storage             storage
68        //
69        DType::Extension(ext_dtype) => {
70            // Recursively apply canonicalization and packing to the storage array backing
71            // each chunk of the extension array.
72            let storage_chunks: Vec<ArrayRef> = chunks
73                .iter()
74                // Extension-typed arrays can be compressed into something that is not an
75                // ExtensionArray, so we should canonicalize each chunk into ExtensionArray first.
76                .map(|chunk| {
77                    chunk
78                        .clone()
79                        .as_extension_typed()
80                        .vortex_expect("Chunk could not be downcast to ExtensionArrayTrait")
81                        .storage_data()
82                })
83                .collect();
84            let storage_dtype = ext_dtype.storage_dtype().clone();
85            let chunked_storage =
86                ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array();
87
88            Ok(Canonical::Extension(ExtensionArray::new(
89                ext_dtype.clone(),
90                chunked_storage,
91            )))
92        }
93
94        DType::List(..) => {
95            // TODO(joe): improve performance, use a listview, once it exists
96
97            let list = pack_lists(chunks, validity, dtype)?;
98            Ok(Canonical::List(list))
99        }
100
101        DType::Bool(_) => {
102            let bool_array = pack_bools(chunks, validity)?;
103            Ok(Canonical::Bool(bool_array))
104        }
105        DType::Primitive(ptype, _) => {
106            match_each_native_ptype!(ptype, |$P| {
107                let prim_array = pack_primitives::<$P>(chunks, validity)?;
108                Ok(Canonical::Primitive(prim_array))
109            })
110        }
111        DType::Utf8(_) => {
112            let varbin_array = pack_views(chunks, dtype, validity)?;
113            Ok(Canonical::VarBinView(varbin_array))
114        }
115        DType::Binary(_) => {
116            let varbin_array = pack_views(chunks, dtype, validity)?;
117            Ok(Canonical::VarBinView(varbin_array))
118        }
119        DType::Null => {
120            let len = chunks.iter().map(|chunk| chunk.len()).sum();
121            let null_array = NullArray::new(len);
122            Ok(Canonical::Null(null_array))
123        }
124    }
125}
126
127fn pack_lists(chunks: &[ArrayRef], validity: Validity, dtype: &DType) -> VortexResult<ListArray> {
128    let len: usize = chunks.iter().map(|c| c.len()).sum();
129    let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
130    offsets.push(0);
131    let mut elements = Vec::new();
132    let elem_dtype = dtype
133        .as_list_element()
134        .vortex_expect("ListArray must have List dtype");
135
136    for chunk in chunks {
137        let chunk = chunk.to_list()?;
138        // TODO: handle i32 offsets if they fit.
139        let offsets_arr = try_cast(
140            chunk.offsets(),
141            &DType::Primitive(PType::I64, Nullability::NonNullable),
142        )?
143        .to_primitive()?;
144
145        let first_offset_value: usize = usize::try_from(&scalar_at(&offsets_arr, 0)?)?;
146        let last_offset_value: usize =
147            usize::try_from(&scalar_at(&offsets_arr, offsets_arr.len() - 1)?)?;
148        elements.push(slice(
149            chunk.elements(),
150            first_offset_value,
151            last_offset_value,
152        )?);
153
154        let adjustment_from_previous = *offsets
155            .last()
156            .ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
157        offsets.extend(
158            offsets_arr
159                .as_slice::<i64>()
160                .iter()
161                .skip(1)
162                .map(|off| off + adjustment_from_previous - first_offset_value as i64),
163        );
164    }
165    let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
166    let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
167
168    ListArray::try_new(chunked_elements, offsets.into_array(), validity)
169}
170
171/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
172/// StructArray, where the Array for each Field is a ChunkedArray.
173///
174/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
175/// been checked to have the same DType already.
176fn swizzle_struct_chunks(
177    chunks: &[ArrayRef],
178    validity: Validity,
179    struct_dtype: &StructDType,
180) -> VortexResult<StructArray> {
181    let len = chunks.iter().map(|chunk| chunk.len()).sum();
182    let mut field_arrays = Vec::new();
183
184    for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
185        let field_chunks = chunks
186            .iter()
187            .map(|c| {
188                c.as_struct_typed()
189                    .vortex_expect("Chunk was not a StructArray")
190                    .maybe_null_field_by_idx(field_idx)
191                    .vortex_expect("Invalid chunked array")
192            })
193            .collect::<Vec<_>>();
194        let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
195        field_arrays.push(field_array.into_array());
196    }
197
198    StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity)
199}
200
201/// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array.
202///
203/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
204/// been checked to have the same DType already.
205fn pack_bools(chunks: &[ArrayRef], validity: Validity) -> VortexResult<BoolArray> {
206    let len = chunks.iter().map(|chunk| chunk.len()).sum();
207    let mut buffer = BooleanBufferBuilder::new(len);
208    for chunk in chunks {
209        let chunk = chunk.to_bool()?;
210        buffer.append_buffer(chunk.boolean_buffer());
211    }
212    Ok(BoolArray::new(buffer.finish(), validity))
213}
214
215/// Builds a new [PrimitiveArray] by repacking the values from the chunks into a single
216/// contiguous array.
217///
218/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
219/// been checked to have the same DType already.
220fn pack_primitives<T: NativePType>(
221    chunks: &[ArrayRef],
222    validity: Validity,
223) -> VortexResult<PrimitiveArray> {
224    let total_len = chunks.iter().map(|a| a.len()).sum();
225    let mut buffer = BufferMut::with_capacity(total_len);
226    for chunk in chunks {
227        let chunk = chunk.to_primitive()?;
228        buffer.extend_from_slice(chunk.as_slice::<T>());
229    }
230    Ok(PrimitiveArray::new(buffer.freeze(), validity))
231}
232
233/// Builds a new [VarBinViewArray] by repacking the values from the chunks into a single
234/// contiguous array.
235///
236/// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have
237/// been checked to have the same DType already.
238fn pack_views(
239    chunks: &[ArrayRef],
240    dtype: &DType,
241    validity: Validity,
242) -> VortexResult<VarBinViewArray> {
243    let total_len = chunks.iter().map(|a| a.len()).sum();
244    let mut views = BufferMut::with_capacity(total_len);
245    let mut buffers = Vec::new();
246    for chunk in chunks {
247        let buffers_offset = u32::try_from(buffers.len())?;
248        let canonical_chunk = chunk.to_varbinview()?;
249        buffers.extend(canonical_chunk.buffers().iter().cloned());
250
251        views.extend(
252            canonical_chunk
253                .views()
254                .iter()
255                .map(|view| view.offset_view(buffers_offset)),
256        );
257    }
258
259    VarBinViewArray::try_new(views.freeze(), buffers, dtype.clone(), validity)
260}
261
262#[cfg(test)]
263mod tests {
264    use std::sync::Arc;
265
266    use vortex_dtype::DType;
267    use vortex_dtype::DType::{List, Primitive};
268    use vortex_dtype::Nullability::NonNullable;
269    use vortex_dtype::PType::I32;
270
271    use crate::ToCanonical;
272    use crate::accessor::ArrayAccessor;
273    use crate::array::Array;
274    use crate::arrays::chunked::canonical::pack_views;
275    use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
276    use crate::compute::{scalar_at, slice};
277    use crate::validity::Validity;
278    use crate::variants::StructArrayTrait;
279
280    fn stringview_array() -> VarBinViewArray {
281        VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"])
282    }
283
284    #[test]
285    pub fn pack_sliced_varbin() {
286        let array1 = slice(&stringview_array(), 1, 3).unwrap();
287        let array2 = slice(&stringview_array(), 2, 4).unwrap();
288        let packed = pack_views(
289            &[array1, array2],
290            &DType::Utf8(NonNullable),
291            Validity::NonNullable,
292        )
293        .unwrap();
294        assert_eq!(packed.len(), 4);
295        let values = packed
296            .with_iterator(|iter| {
297                iter.flatten()
298                    .map(|v| unsafe { String::from_utf8_unchecked(v.to_vec()) })
299                    .collect::<Vec<_>>()
300            })
301            .unwrap();
302        assert_eq!(values, &["bar", "baz", "baz", "quak"]);
303    }
304
305    #[test]
306    pub fn pack_nested_structs() {
307        let struct_array = StructArray::try_new(
308            vec!["a".into()].into(),
309            vec![stringview_array().into_array()],
310            4,
311            Validity::NonNullable,
312        )
313        .unwrap();
314        let dtype = struct_array.dtype().clone();
315        let chunked = ChunkedArray::try_new(
316            vec![
317                ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
318                    .unwrap()
319                    .into_array(),
320            ],
321            dtype,
322        )
323        .unwrap()
324        .into_array();
325        let canonical_struct = chunked.to_struct().unwrap();
326        let canonical_varbin = canonical_struct
327            .maybe_null_field_by_idx(0)
328            .unwrap()
329            .to_varbinview()
330            .unwrap();
331        let original_varbin = struct_array
332            .maybe_null_field_by_idx(0)
333            .unwrap()
334            .to_varbinview()
335            .unwrap();
336        let orig_values = original_varbin
337            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
338            .unwrap();
339        let canon_values = canonical_varbin
340            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
341            .unwrap();
342        assert_eq!(orig_values, canon_values);
343    }
344
345    #[test]
346    pub fn pack_nested_lists() {
347        let l1 = ListArray::try_new(
348            PrimitiveArray::from_iter([1, 2, 3, 4]).into_array(),
349            PrimitiveArray::from_iter([0, 3]).into_array(),
350            Validity::NonNullable,
351        )
352        .unwrap();
353
354        let l2 = ListArray::try_new(
355            PrimitiveArray::from_iter([5, 6]).into_array(),
356            PrimitiveArray::from_iter([0, 2]).into_array(),
357            Validity::NonNullable,
358        )
359        .unwrap();
360
361        let chunked_list = ChunkedArray::try_new(
362            vec![l1.clone().into_array(), l2.clone().into_array()],
363            List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
364        );
365
366        let canon_values = chunked_list.unwrap().to_list().unwrap();
367
368        assert_eq!(
369            scalar_at(&l1, 0).unwrap(),
370            scalar_at(&canon_values, 0).unwrap()
371        );
372        assert_eq!(
373            scalar_at(&l2, 0).unwrap(),
374            scalar_at(&canon_values, 1).unwrap()
375        );
376    }
377}