vortex_array/arrays/chunked/
canonical.rs

1use std::sync::Arc;
2
3use vortex_buffer::BufferMut;
4use vortex_dtype::{DType, Nullability, PType, StructDType};
5use vortex_error::{VortexExpect, VortexResult, vortex_err};
6
7use super::ChunkedArray;
8use crate::arrays::{ListArray, PrimitiveArray, StructArray};
9use crate::builders::{ArrayBuilder, builder_with_capacity};
10use crate::compute::{scalar_at, slice, try_cast};
11use crate::validity::Validity;
12use crate::{Array as _, ArrayCanonicalImpl, ArrayRef, Canonical, ToCanonical};
13
14impl ArrayCanonicalImpl for ChunkedArray {
15    fn _to_canonical(&self) -> VortexResult<Canonical> {
16        match self.dtype() {
17            DType::Struct(struct_dtype, _) => {
18                let struct_array = swizzle_struct_chunks(
19                    self.chunks(),
20                    Validity::copy_from_array(self)?,
21                    struct_dtype,
22                )?;
23                Ok(Canonical::Struct(struct_array))
24            }
25            DType::List(elem, _) => Ok(Canonical::List(pack_lists(
26                self.chunks(),
27                Validity::copy_from_array(self)?,
28                elem,
29            )?)),
30            _ => {
31                let mut builder = builder_with_capacity(self.dtype(), self.len());
32                self.append_to_builder(builder.as_mut())?;
33                builder.finish().to_canonical()
34            }
35        }
36    }
37
38    fn _append_to_builder(&self, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
39        for chunk in self.chunks() {
40            chunk.append_to_builder(builder)?;
41        }
42        Ok(())
43    }
44}
45
46/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
47/// StructArray, where the Array for each Field is a ChunkedArray.
48fn swizzle_struct_chunks(
49    chunks: &[ArrayRef],
50    validity: Validity,
51    struct_dtype: &Arc<StructDType>,
52) -> VortexResult<StructArray> {
53    let len = chunks.iter().map(|chunk| chunk.len()).sum();
54    let mut field_arrays = Vec::new();
55
56    for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
57        let field_chunks = chunks
58            .iter()
59            .map(|c| {
60                c.as_struct_typed()
61                    .vortex_expect("Chunk was not a StructArray")
62                    .maybe_null_field_by_idx(field_idx)
63                    .vortex_expect("Invalid chunked array")
64            })
65            .collect::<Vec<_>>();
66        let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
67        field_arrays.push(field_array.into_array());
68    }
69
70    StructArray::try_new_with_dtype(field_arrays, struct_dtype.clone(), len, validity)
71}
72
73fn pack_lists(
74    chunks: &[ArrayRef],
75    validity: Validity,
76    elem_dtype: &DType,
77) -> VortexResult<ListArray> {
78    let len: usize = chunks.iter().map(|c| c.len()).sum();
79    let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
80    offsets.push(0);
81    let mut elements = Vec::new();
82
83    for chunk in chunks {
84        let chunk = chunk.to_list()?;
85        // TODO: handle i32 offsets if they fit.
86        let offsets_arr = try_cast(
87            chunk.offsets(),
88            &DType::Primitive(PType::I64, Nullability::NonNullable),
89        )?
90        .to_primitive()?;
91
92        let first_offset_value: usize = usize::try_from(&scalar_at(&offsets_arr, 0)?)?;
93        let last_offset_value: usize =
94            usize::try_from(&scalar_at(&offsets_arr, offsets_arr.len() - 1)?)?;
95        elements.push(slice(
96            chunk.elements(),
97            first_offset_value,
98            last_offset_value,
99        )?);
100
101        let adjustment_from_previous = *offsets
102            .last()
103            .ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
104        offsets.extend(
105            offsets_arr
106                .as_slice::<i64>()
107                .iter()
108                .skip(1)
109                .map(|off| off + adjustment_from_previous - first_offset_value as i64),
110        );
111    }
112    let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
113    let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
114
115    ListArray::try_new(chunked_elements, offsets.into_array(), validity)
116}
117
118#[cfg(test)]
119mod tests {
120    use std::sync::Arc;
121
122    use vortex_dtype::DType::{List, Primitive};
123    use vortex_dtype::Nullability::NonNullable;
124    use vortex_dtype::PType::I32;
125
126    use crate::ToCanonical;
127    use crate::accessor::ArrayAccessor;
128    use crate::array::Array;
129    use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
130    use crate::compute::scalar_at;
131    use crate::validity::Validity;
132    use crate::variants::StructArrayTrait;
133
134    #[test]
135    pub fn pack_nested_structs() {
136        let struct_array = StructArray::try_new(
137            vec!["a".into()].into(),
138            vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
139            4,
140            Validity::NonNullable,
141        )
142        .unwrap();
143        let dtype = struct_array.dtype().clone();
144        let chunked = ChunkedArray::try_new(
145            vec![
146                ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
147                    .unwrap()
148                    .into_array(),
149            ],
150            dtype,
151        )
152        .unwrap()
153        .into_array();
154        let canonical_struct = chunked.to_struct().unwrap();
155        let canonical_varbin = canonical_struct
156            .maybe_null_field_by_idx(0)
157            .unwrap()
158            .to_varbinview()
159            .unwrap();
160        let original_varbin = struct_array
161            .maybe_null_field_by_idx(0)
162            .unwrap()
163            .to_varbinview()
164            .unwrap();
165        let orig_values = original_varbin
166            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
167            .unwrap();
168        let canon_values = canonical_varbin
169            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
170            .unwrap();
171        assert_eq!(orig_values, canon_values);
172    }
173
174    #[test]
175    pub fn pack_nested_lists() {
176        let l1 = ListArray::try_new(
177            PrimitiveArray::from_iter([1, 2, 3, 4]).into_array(),
178            PrimitiveArray::from_iter([0, 3]).into_array(),
179            Validity::NonNullable,
180        )
181        .unwrap();
182
183        let l2 = ListArray::try_new(
184            PrimitiveArray::from_iter([5, 6]).into_array(),
185            PrimitiveArray::from_iter([0, 2]).into_array(),
186            Validity::NonNullable,
187        )
188        .unwrap();
189
190        let chunked_list = ChunkedArray::try_new(
191            vec![l1.clone().into_array(), l2.clone().into_array()],
192            List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
193        );
194
195        let canon_values = chunked_list.unwrap().to_list().unwrap();
196
197        assert_eq!(
198            scalar_at(&l1, 0).unwrap(),
199            scalar_at(&canon_values, 0).unwrap()
200        );
201        assert_eq!(
202            scalar_at(&l2, 0).unwrap(),
203            scalar_at(&canon_values, 1).unwrap()
204        );
205    }
206}