vortex_array/arrays/chunked/
decode.rs

1use std::sync::Arc;
2
3use vortex_buffer::BufferMut;
4use vortex_dtype::{DType, Nullability, PType, StructDType};
5use vortex_error::{VortexExpect, VortexResult, vortex_err};
6
7use super::ChunkedArray;
8use crate::arrays::{ChunkedVTable, ListArray, PrimitiveArray, StructArray};
9use crate::builders::{ArrayBuilder, builder_with_capacity};
10use crate::compute::cast;
11use crate::validity::Validity;
12use crate::vtable::CanonicalVTable;
13use crate::{Array as _, ArrayRef, Canonical, IntoArray, ToCanonical};
14
15impl CanonicalVTable<ChunkedVTable> for ChunkedVTable {
16    fn canonicalize(array: &ChunkedArray) -> VortexResult<Canonical> {
17        if array.nchunks() == 0 {
18            return Ok(Canonical::empty(array.dtype()));
19        }
20        if array.nchunks() == 1 {
21            return array.chunks()[0].to_canonical();
22        }
23        match array.dtype() {
24            DType::Struct(struct_dtype, _) => {
25                let struct_array = swizzle_struct_chunks(
26                    array.chunks(),
27                    Validity::copy_from_array(array.as_ref())?,
28                    struct_dtype,
29                )?;
30                Ok(Canonical::Struct(struct_array))
31            }
32            DType::List(elem, _) => Ok(Canonical::List(pack_lists(
33                array.chunks(),
34                Validity::copy_from_array(array.as_ref())?,
35                elem,
36            )?)),
37            _ => {
38                let mut builder = builder_with_capacity(array.dtype(), array.len());
39                array.append_to_builder(builder.as_mut())?;
40                builder.finish().to_canonical()
41            }
42        }
43    }
44
45    fn append_to_builder(array: &ChunkedArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
46        for chunk in array.chunks() {
47            chunk.append_to_builder(builder)?;
48        }
49        Ok(())
50    }
51}
52
53/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
54/// StructArray, where the Array for each Field is a ChunkedArray.
55fn swizzle_struct_chunks(
56    chunks: &[ArrayRef],
57    validity: Validity,
58    struct_dtype: &Arc<StructDType>,
59) -> VortexResult<StructArray> {
60    let len = chunks.iter().map(|chunk| chunk.len()).sum();
61    let mut field_arrays = Vec::new();
62
63    for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
64        let field_chunks = chunks
65            .iter()
66            .map(|c| {
67                c.to_struct()
68                    .vortex_expect("Chunk was not a StructArray")
69                    .fields()
70                    .get(field_idx)
71                    .vortex_expect("Invalid field index")
72                    .to_array()
73            })
74            .collect::<Vec<_>>();
75        let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
76        field_arrays.push(field_array.into_array());
77    }
78
79    StructArray::try_new_with_dtype(field_arrays, struct_dtype.clone(), len, validity)
80}
81
82fn pack_lists(
83    chunks: &[ArrayRef],
84    validity: Validity,
85    elem_dtype: &DType,
86) -> VortexResult<ListArray> {
87    let len: usize = chunks.iter().map(|c| c.len()).sum();
88    let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
89    offsets.push(0);
90    let mut elements = Vec::new();
91
92    for chunk in chunks {
93        let chunk = chunk.to_list()?;
94        // TODO: handle i32 offsets if they fit.
95        let offsets_arr = cast(
96            chunk.offsets(),
97            &DType::Primitive(PType::I64, Nullability::NonNullable),
98        )?
99        .to_primitive()?;
100
101        let first_offset_value: usize = usize::try_from(&offsets_arr.scalar_at(0)?)?;
102        let last_offset_value: usize =
103            usize::try_from(&offsets_arr.scalar_at(offsets_arr.len() - 1)?)?;
104        elements.push(
105            chunk
106                .elements()
107                .slice(first_offset_value, last_offset_value)?,
108        );
109
110        let adjustment_from_previous = *offsets
111            .last()
112            .ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
113        offsets.extend(
114            offsets_arr
115                .as_slice::<i64>()
116                .iter()
117                .skip(1)
118                .map(|off| off + adjustment_from_previous - first_offset_value as i64),
119        );
120    }
121    let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
122    let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
123
124    ListArray::try_new(chunked_elements, offsets.into_array(), validity)
125}
126
127#[cfg(test)]
128mod tests {
129    use std::sync::Arc;
130
131    use vortex_dtype::DType::{List, Primitive};
132    use vortex_dtype::Nullability::NonNullable;
133    use vortex_dtype::PType::I32;
134
135    use crate::accessor::ArrayAccessor;
136    use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
137    use crate::validity::Validity;
138    use crate::{IntoArray, ToCanonical};
139
140    #[test]
141    pub fn pack_nested_structs() {
142        let struct_array = StructArray::try_new(
143            vec!["a".into()].into(),
144            vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
145            4,
146            Validity::NonNullable,
147        )
148        .unwrap();
149        let dtype = struct_array.dtype().clone();
150        let chunked = ChunkedArray::try_new(
151            vec![
152                ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
153                    .unwrap()
154                    .into_array(),
155            ],
156            dtype,
157        )
158        .unwrap()
159        .into_array();
160        let canonical_struct = chunked.to_struct().unwrap();
161        let canonical_varbin = canonical_struct.fields()[0].to_varbinview().unwrap();
162        let original_varbin = struct_array.fields()[0].to_varbinview().unwrap();
163        let orig_values = original_varbin
164            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
165            .unwrap();
166        let canon_values = canonical_varbin
167            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
168            .unwrap();
169        assert_eq!(orig_values, canon_values);
170    }
171
172    #[test]
173    pub fn pack_nested_lists() {
174        let l1 = ListArray::try_new(
175            PrimitiveArray::from_iter([1, 2, 3, 4]).into_array(),
176            PrimitiveArray::from_iter([0, 3]).into_array(),
177            Validity::NonNullable,
178        )
179        .unwrap();
180
181        let l2 = ListArray::try_new(
182            PrimitiveArray::from_iter([5, 6]).into_array(),
183            PrimitiveArray::from_iter([0, 2]).into_array(),
184            Validity::NonNullable,
185        )
186        .unwrap();
187
188        let chunked_list = ChunkedArray::try_new(
189            vec![l1.clone().into_array(), l2.clone().into_array()],
190            List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
191        );
192
193        let canon_values = chunked_list.unwrap().to_list().unwrap();
194
195        assert_eq!(l1.scalar_at(0).unwrap(), canon_values.scalar_at(0).unwrap());
196        assert_eq!(l2.scalar_at(0).unwrap(), canon_values.scalar_at(1).unwrap());
197    }
198}