vortex_array/arrays/chunked/
decode.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_buffer::BufferMut;
5use vortex_dtype::{DType, Nullability, PType, StructFields};
6use vortex_error::{VortexExpect, VortexResult, vortex_err};
7
8use super::ChunkedArray;
9use crate::arrays::{ChunkedVTable, ListArray, PrimitiveArray, StructArray};
10use crate::builders::{ArrayBuilder, builder_with_capacity};
11use crate::compute::cast;
12use crate::validity::Validity;
13use crate::vtable::CanonicalVTable;
14use crate::{Array as _, ArrayRef, Canonical, IntoArray, ToCanonical};
15
16impl CanonicalVTable<ChunkedVTable> for ChunkedVTable {
17    fn canonicalize(array: &ChunkedArray) -> VortexResult<Canonical> {
18        if array.nchunks() == 0 {
19            return Ok(Canonical::empty(array.dtype()));
20        }
21        if array.nchunks() == 1 {
22            return array.chunks()[0].to_canonical();
23        }
24        match array.dtype() {
25            DType::Struct(struct_dtype, _) => {
26                let struct_array = swizzle_struct_chunks(
27                    array.chunks(),
28                    Validity::copy_from_array(array.as_ref())?,
29                    struct_dtype,
30                )?;
31                Ok(Canonical::Struct(struct_array))
32            }
33            DType::List(elem, _) => Ok(Canonical::List(pack_lists(
34                array.chunks(),
35                Validity::copy_from_array(array.as_ref())?,
36                elem,
37            )?)),
38            _ => {
39                let mut builder = builder_with_capacity(array.dtype(), array.len());
40                array.append_to_builder(builder.as_mut())?;
41                builder.finish().to_canonical()
42            }
43        }
44    }
45
46    fn append_to_builder(array: &ChunkedArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
47        for chunk in array.chunks() {
48            chunk.append_to_builder(builder)?;
49        }
50        Ok(())
51    }
52}
53
54/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
55/// StructArray, where the Array for each Field is a ChunkedArray.
56fn swizzle_struct_chunks(
57    chunks: &[ArrayRef],
58    validity: Validity,
59    struct_dtype: &StructFields,
60) -> VortexResult<StructArray> {
61    let len = chunks.iter().map(|chunk| chunk.len()).sum();
62    let mut field_arrays = Vec::new();
63
64    for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
65        let field_chunks = chunks
66            .iter()
67            .map(|c| {
68                c.to_struct()
69                    .vortex_expect("Chunk was not a StructArray")
70                    .fields()
71                    .get(field_idx)
72                    .vortex_expect("Invalid field index")
73                    .to_array()
74            })
75            .collect::<Vec<_>>();
76        let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
77        field_arrays.push(field_array.into_array());
78    }
79
80    StructArray::try_new_with_dtype(field_arrays, struct_dtype.clone(), len, validity)
81}
82
83fn pack_lists(
84    chunks: &[ArrayRef],
85    validity: Validity,
86    elem_dtype: &DType,
87) -> VortexResult<ListArray> {
88    let len: usize = chunks.iter().map(|c| c.len()).sum();
89    let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
90    offsets.push(0);
91    let mut elements = Vec::new();
92
93    for chunk in chunks {
94        let chunk = chunk.to_list()?;
95        // TODO: handle i32 offsets if they fit.
96        let offsets_arr = cast(
97            chunk.offsets(),
98            &DType::Primitive(PType::I64, Nullability::NonNullable),
99        )?
100        .to_primitive()?;
101
102        let first_offset_value: usize = usize::try_from(&offsets_arr.scalar_at(0)?)?;
103        let last_offset_value: usize =
104            usize::try_from(&offsets_arr.scalar_at(offsets_arr.len() - 1)?)?;
105        elements.push(
106            chunk
107                .elements()
108                .slice(first_offset_value, last_offset_value)?,
109        );
110
111        let adjustment_from_previous = *offsets
112            .last()
113            .ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
114        offsets.extend_trusted(
115            offsets_arr
116                .as_slice::<i64>()
117                .iter()
118                .skip(1)
119                .map(|off| off + adjustment_from_previous - first_offset_value as i64),
120        );
121    }
122    let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
123    let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
124
125    ListArray::try_new(chunked_elements, offsets.into_array(), validity)
126}
127
128#[cfg(test)]
129mod tests {
130    use std::sync::Arc;
131
132    use vortex_dtype::DType::{List, Primitive};
133    use vortex_dtype::Nullability::NonNullable;
134    use vortex_dtype::PType::I32;
135
136    use crate::accessor::ArrayAccessor;
137    use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
138    use crate::validity::Validity;
139    use crate::{IntoArray, ToCanonical};
140
141    #[test]
142    pub fn pack_nested_structs() {
143        let struct_array = StructArray::try_new(
144            vec!["a".into()].into(),
145            vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
146            4,
147            Validity::NonNullable,
148        )
149        .unwrap();
150        let dtype = struct_array.dtype().clone();
151        let chunked = ChunkedArray::try_new(
152            vec![
153                ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
154                    .unwrap()
155                    .into_array(),
156            ],
157            dtype,
158        )
159        .unwrap()
160        .into_array();
161        let canonical_struct = chunked.to_struct().unwrap();
162        let canonical_varbin = canonical_struct.fields()[0].to_varbinview().unwrap();
163        let original_varbin = struct_array.fields()[0].to_varbinview().unwrap();
164        let orig_values = original_varbin
165            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
166            .unwrap();
167        let canon_values = canonical_varbin
168            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
169            .unwrap();
170        assert_eq!(orig_values, canon_values);
171    }
172
173    #[test]
174    pub fn pack_nested_lists() {
175        let l1 = ListArray::try_new(
176            PrimitiveArray::from_iter([1, 2, 3, 4]).into_array(),
177            PrimitiveArray::from_iter([0, 3]).into_array(),
178            Validity::NonNullable,
179        )
180        .unwrap();
181
182        let l2 = ListArray::try_new(
183            PrimitiveArray::from_iter([5, 6]).into_array(),
184            PrimitiveArray::from_iter([0, 2]).into_array(),
185            Validity::NonNullable,
186        )
187        .unwrap();
188
189        let chunked_list = ChunkedArray::try_new(
190            vec![l1.clone().into_array(), l2.clone().into_array()],
191            List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
192        );
193
194        let canon_values = chunked_list.unwrap().to_list().unwrap();
195
196        assert_eq!(l1.scalar_at(0).unwrap(), canon_values.scalar_at(0).unwrap());
197        assert_eq!(l2.scalar_at(0).unwrap(), canon_values.scalar_at(1).unwrap());
198    }
199}