vortex_array/arrays/chunked/
decode.rs

1use vortex_buffer::BufferMut;
2use vortex_dtype::{DType, Nullability, PType, StructFields};
3use vortex_error::{VortexExpect, VortexResult, vortex_err};
4
5use super::ChunkedArray;
6use crate::arrays::{ChunkedVTable, ListArray, PrimitiveArray, StructArray};
7use crate::builders::{ArrayBuilder, builder_with_capacity};
8use crate::compute::cast;
9use crate::validity::Validity;
10use crate::vtable::CanonicalVTable;
11use crate::{Array as _, ArrayRef, Canonical, IntoArray, ToCanonical};
12
13impl CanonicalVTable<ChunkedVTable> for ChunkedVTable {
14    fn canonicalize(array: &ChunkedArray) -> VortexResult<Canonical> {
15        if array.nchunks() == 0 {
16            return Ok(Canonical::empty(array.dtype()));
17        }
18        if array.nchunks() == 1 {
19            return array.chunks()[0].to_canonical();
20        }
21        match array.dtype() {
22            DType::Struct(struct_dtype, _) => {
23                let struct_array = swizzle_struct_chunks(
24                    array.chunks(),
25                    Validity::copy_from_array(array.as_ref())?,
26                    struct_dtype,
27                )?;
28                Ok(Canonical::Struct(struct_array))
29            }
30            DType::List(elem, _) => Ok(Canonical::List(pack_lists(
31                array.chunks(),
32                Validity::copy_from_array(array.as_ref())?,
33                elem,
34            )?)),
35            _ => {
36                let mut builder = builder_with_capacity(array.dtype(), array.len());
37                array.append_to_builder(builder.as_mut())?;
38                builder.finish().to_canonical()
39            }
40        }
41    }
42
43    fn append_to_builder(array: &ChunkedArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
44        for chunk in array.chunks() {
45            chunk.append_to_builder(builder)?;
46        }
47        Ok(())
48    }
49}
50
51/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
52/// StructArray, where the Array for each Field is a ChunkedArray.
53fn swizzle_struct_chunks(
54    chunks: &[ArrayRef],
55    validity: Validity,
56    struct_dtype: &StructFields,
57) -> VortexResult<StructArray> {
58    let len = chunks.iter().map(|chunk| chunk.len()).sum();
59    let mut field_arrays = Vec::new();
60
61    for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
62        let field_chunks = chunks
63            .iter()
64            .map(|c| {
65                c.to_struct()
66                    .vortex_expect("Chunk was not a StructArray")
67                    .fields()
68                    .get(field_idx)
69                    .vortex_expect("Invalid field index")
70                    .to_array()
71            })
72            .collect::<Vec<_>>();
73        let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
74        field_arrays.push(field_array.into_array());
75    }
76
77    StructArray::try_new_with_dtype(field_arrays, struct_dtype.clone(), len, validity)
78}
79
80fn pack_lists(
81    chunks: &[ArrayRef],
82    validity: Validity,
83    elem_dtype: &DType,
84) -> VortexResult<ListArray> {
85    let len: usize = chunks.iter().map(|c| c.len()).sum();
86    let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
87    offsets.push(0);
88    let mut elements = Vec::new();
89
90    for chunk in chunks {
91        let chunk = chunk.to_list()?;
92        // TODO: handle i32 offsets if they fit.
93        let offsets_arr = cast(
94            chunk.offsets(),
95            &DType::Primitive(PType::I64, Nullability::NonNullable),
96        )?
97        .to_primitive()?;
98
99        let first_offset_value: usize = usize::try_from(&offsets_arr.scalar_at(0)?)?;
100        let last_offset_value: usize =
101            usize::try_from(&offsets_arr.scalar_at(offsets_arr.len() - 1)?)?;
102        elements.push(
103            chunk
104                .elements()
105                .slice(first_offset_value, last_offset_value)?,
106        );
107
108        let adjustment_from_previous = *offsets
109            .last()
110            .ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
111        offsets.extend(
112            offsets_arr
113                .as_slice::<i64>()
114                .iter()
115                .skip(1)
116                .map(|off| off + adjustment_from_previous - first_offset_value as i64),
117        );
118    }
119    let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
120    let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
121
122    ListArray::try_new(chunked_elements, offsets.into_array(), validity)
123}
124
125#[cfg(test)]
126mod tests {
127    use std::sync::Arc;
128
129    use vortex_dtype::DType::{List, Primitive};
130    use vortex_dtype::Nullability::NonNullable;
131    use vortex_dtype::PType::I32;
132
133    use crate::accessor::ArrayAccessor;
134    use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
135    use crate::validity::Validity;
136    use crate::{IntoArray, ToCanonical};
137
138    #[test]
139    pub fn pack_nested_structs() {
140        let struct_array = StructArray::try_new(
141            vec!["a".into()].into(),
142            vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
143            4,
144            Validity::NonNullable,
145        )
146        .unwrap();
147        let dtype = struct_array.dtype().clone();
148        let chunked = ChunkedArray::try_new(
149            vec![
150                ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
151                    .unwrap()
152                    .into_array(),
153            ],
154            dtype,
155        )
156        .unwrap()
157        .into_array();
158        let canonical_struct = chunked.to_struct().unwrap();
159        let canonical_varbin = canonical_struct.fields()[0].to_varbinview().unwrap();
160        let original_varbin = struct_array.fields()[0].to_varbinview().unwrap();
161        let orig_values = original_varbin
162            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
163            .unwrap();
164        let canon_values = canonical_varbin
165            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
166            .unwrap();
167        assert_eq!(orig_values, canon_values);
168    }
169
170    #[test]
171    pub fn pack_nested_lists() {
172        let l1 = ListArray::try_new(
173            PrimitiveArray::from_iter([1, 2, 3, 4]).into_array(),
174            PrimitiveArray::from_iter([0, 3]).into_array(),
175            Validity::NonNullable,
176        )
177        .unwrap();
178
179        let l2 = ListArray::try_new(
180            PrimitiveArray::from_iter([5, 6]).into_array(),
181            PrimitiveArray::from_iter([0, 2]).into_array(),
182            Validity::NonNullable,
183        )
184        .unwrap();
185
186        let chunked_list = ChunkedArray::try_new(
187            vec![l1.clone().into_array(), l2.clone().into_array()],
188            List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
189        );
190
191        let canon_values = chunked_list.unwrap().to_list().unwrap();
192
193        assert_eq!(l1.scalar_at(0).unwrap(), canon_values.scalar_at(0).unwrap());
194        assert_eq!(l2.scalar_at(0).unwrap(), canon_values.scalar_at(1).unwrap());
195    }
196}