vortex_array/arrays/chunked/
decode.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_buffer::BufferMut;
5use vortex_dtype::{DType, Nullability, PType, StructFields};
6use vortex_error::{VortexExpect, VortexUnwrap, vortex_err};
7
8use super::ChunkedArray;
9use crate::arrays::{ChunkedVTable, ListArray, PrimitiveArray, StructArray};
10use crate::builders::{ArrayBuilder, builder_with_capacity};
11use crate::compute::cast;
12use crate::validity::Validity;
13use crate::vtable::CanonicalVTable;
14use crate::{Array as _, ArrayRef, Canonical, IntoArray, ToCanonical};
15
16impl CanonicalVTable<ChunkedVTable> for ChunkedVTable {
17    fn canonicalize(array: &ChunkedArray) -> Canonical {
18        if array.nchunks() == 0 {
19            return Canonical::empty(array.dtype());
20        }
21        if array.nchunks() == 1 {
22            return array.chunks()[0].to_canonical();
23        }
24
25        match array.dtype() {
26            DType::Struct(struct_dtype, _) => {
27                let struct_array = swizzle_struct_chunks(
28                    array.chunks(),
29                    Validity::copy_from_array(array.as_ref()),
30                    struct_dtype,
31                );
32                Canonical::Struct(struct_array)
33            }
34            DType::List(elem_dtype, _) => Canonical::List(pack_lists(
35                array.chunks(),
36                Validity::copy_from_array(array.as_ref()),
37                elem_dtype,
38            )),
39            _ => {
40                let mut builder = builder_with_capacity(array.dtype(), array.len());
41                array.append_to_builder(builder.as_mut());
42                builder.finish_into_canonical()
43            }
44        }
45    }
46
47    fn append_to_builder(array: &ChunkedArray, builder: &mut dyn ArrayBuilder) {
48        for chunk in array.chunks() {
49            chunk.append_to_builder(builder);
50        }
51    }
52}
53
54/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
55/// StructArray, where the Array for each Field is a ChunkedArray.
56fn swizzle_struct_chunks(
57    chunks: &[ArrayRef],
58    validity: Validity,
59    struct_dtype: &StructFields,
60) -> StructArray {
61    let len = chunks.iter().map(|chunk| chunk.len()).sum();
62    let mut field_arrays = Vec::new();
63
64    for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
65        let field_chunks = chunks
66            .iter()
67            .map(|c| {
68                c.to_struct()
69                    .fields()
70                    .get(field_idx)
71                    .vortex_expect("Invalid field index")
72                    .to_array()
73            })
74            .collect::<Vec<_>>();
75        // SAFETY: field_chunks are extracted from valid StructArrays with matching dtypes.
76        // Each chunk's field array is guaranteed to be valid for field_dtype.
77        let field_array = unsafe { ChunkedArray::new_unchecked(field_chunks, field_dtype.clone()) };
78        field_arrays.push(field_array.into_array());
79    }
80
81    // SAFETY: field_arrays are built from corresponding chunks of same length, dtypes match by
82    // construction.
83    unsafe { StructArray::new_unchecked(field_arrays, struct_dtype.clone(), len, validity) }
84}
85
86fn pack_lists(chunks: &[ArrayRef], validity: Validity, elem_dtype: &DType) -> ListArray {
87    let len: usize = chunks.iter().map(|c| c.len()).sum();
88    let mut offsets = BufferMut::<u64>::with_capacity(len + 1);
89    offsets.push(0);
90    let mut elements = Vec::new();
91
92    for chunk in chunks {
93        let chunk = chunk.to_list();
94        // TODO: handle i32 offsets if they fit.
95        let offsets_arr = cast(
96            chunk.offsets(),
97            &DType::Primitive(PType::U64, Nullability::NonNullable),
98        )
99        .vortex_expect("Must fit array offsets in u64")
100        .to_primitive();
101
102        let first_offset_value: usize =
103            usize::try_from(&offsets_arr.scalar_at(0)).vortex_expect("Offset must be a usize");
104        let last_offset_value: usize =
105            usize::try_from(&offsets_arr.scalar_at(offsets_arr.len() - 1))
106                .vortex_expect("Offset must be a usize");
107        elements.push(
108            chunk
109                .elements()
110                .slice(first_offset_value..last_offset_value),
111        );
112
113        let adjustment_from_previous = *offsets
114            .last()
115            .ok_or_else(|| vortex_err!("List offsets must have at least one element"))
116            .vortex_unwrap();
117        offsets.extend_trusted(
118            offsets_arr
119                .as_slice::<u64>()
120                .iter()
121                .skip(1)
122                .map(|off| off + adjustment_from_previous - first_offset_value as u64),
123        );
124    }
125    // SAFETY: elements are sliced from valid ListArrays with matching elem_dtype.
126    // All elements arrays are guaranteed to be valid for elem_dtype.
127    let chunked_elements =
128        unsafe { ChunkedArray::new_unchecked(elements, elem_dtype.clone()) }.into_array();
129    let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
130
131    // SAFETY: chunked_elements contains valid elements from the original lists.
132    // Offsets are monotonically increasing starting from 0, with each offset[i+1] >= offset[i],
133    // and the last offset equals the total length of chunked_elements.
134    // The validity matches the number of lists (offsets.len() - 1).
135    unsafe { ListArray::new_unchecked(chunked_elements, offsets.into_array(), validity) }
136}
137
138#[cfg(test)]
139mod tests {
140    use std::sync::Arc;
141
142    use vortex_buffer::buffer;
143    use vortex_dtype::DType::{List, Primitive};
144    use vortex_dtype::Nullability::NonNullable;
145    use vortex_dtype::PType::I32;
146
147    use crate::accessor::ArrayAccessor;
148    use crate::arrays::{ChunkedArray, ListArray, StructArray, VarBinViewArray};
149    use crate::validity::Validity;
150    use crate::{IntoArray, ToCanonical};
151
152    #[test]
153    pub fn pack_nested_structs() {
154        let struct_array = StructArray::try_new(
155            ["a"].into(),
156            vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
157            4,
158            Validity::NonNullable,
159        )
160        .unwrap();
161        let dtype = struct_array.dtype().clone();
162        let chunked = ChunkedArray::try_new(
163            vec![
164                ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
165                    .unwrap()
166                    .into_array(),
167            ],
168            dtype,
169        )
170        .unwrap()
171        .into_array();
172        let canonical_struct = chunked.to_struct();
173        let canonical_varbin = canonical_struct.fields()[0].to_varbinview();
174        let original_varbin = struct_array.fields()[0].to_varbinview();
175        let orig_values = original_varbin
176            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
177            .unwrap();
178        let canon_values = canonical_varbin
179            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
180            .unwrap();
181        assert_eq!(orig_values, canon_values);
182    }
183
184    #[test]
185    pub fn pack_nested_lists() {
186        let l1 = ListArray::try_new(
187            buffer![1, 2, 3, 4].into_array(),
188            buffer![0, 3].into_array(),
189            Validity::NonNullable,
190        )
191        .unwrap();
192
193        let l2 = ListArray::try_new(
194            buffer![5, 6].into_array(),
195            buffer![0, 2].into_array(),
196            Validity::NonNullable,
197        )
198        .unwrap();
199
200        let chunked_list = ChunkedArray::try_new(
201            vec![l1.clone().into_array(), l2.clone().into_array()],
202            List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
203        );
204
205        let canon_values = chunked_list.unwrap().to_list();
206
207        assert_eq!(l1.scalar_at(0), canon_values.scalar_at(0));
208        assert_eq!(l2.scalar_at(0), canon_values.scalar_at(1));
209    }
210}