vortex_array/arrays/chunked/
canonical.rs

1use std::sync::Arc;
2
3use vortex_buffer::BufferMut;
4use vortex_dtype::{DType, Nullability, PType, StructDType};
5use vortex_error::{VortexExpect, VortexResult, vortex_err};
6
7use super::ChunkedArray;
8use crate::arrays::{ListArray, PrimitiveArray, StructArray};
9use crate::builders::{ArrayBuilder, builder_with_capacity};
10use crate::compute::{cast, scalar_at, slice};
11use crate::validity::Validity;
12use crate::{Array as _, ArrayCanonicalImpl, ArrayRef, Canonical, ToCanonical};
13
14impl ArrayCanonicalImpl for ChunkedArray {
15    fn _to_canonical(&self) -> VortexResult<Canonical> {
16        if self.nchunks() == 0 {
17            return Ok(Canonical::empty(self.dtype()));
18        }
19        if self.nchunks() == 1 {
20            return self.chunks()[0].to_canonical();
21        }
22        match self.dtype() {
23            DType::Struct(struct_dtype, _) => {
24                let struct_array = swizzle_struct_chunks(
25                    self.chunks(),
26                    Validity::copy_from_array(self)?,
27                    struct_dtype,
28                )?;
29                Ok(Canonical::Struct(struct_array))
30            }
31            DType::List(elem, _) => Ok(Canonical::List(pack_lists(
32                self.chunks(),
33                Validity::copy_from_array(self)?,
34                elem,
35            )?)),
36            _ => {
37                let mut builder = builder_with_capacity(self.dtype(), self.len());
38                self.append_to_builder(builder.as_mut())?;
39                builder.finish().to_canonical()
40            }
41        }
42    }
43
44    fn _append_to_builder(&self, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
45        for chunk in self.chunks() {
46            chunk.append_to_builder(builder)?;
47        }
48        Ok(())
49    }
50}
51
52/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
53/// StructArray, where the Array for each Field is a ChunkedArray.
54fn swizzle_struct_chunks(
55    chunks: &[ArrayRef],
56    validity: Validity,
57    struct_dtype: &Arc<StructDType>,
58) -> VortexResult<StructArray> {
59    let len = chunks.iter().map(|chunk| chunk.len()).sum();
60    let mut field_arrays = Vec::new();
61
62    for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
63        let field_chunks = chunks
64            .iter()
65            .map(|c| {
66                c.as_struct_typed()
67                    .vortex_expect("Chunk was not a StructArray")
68                    .maybe_null_field_by_idx(field_idx)
69                    .vortex_expect("Invalid chunked array")
70            })
71            .collect::<Vec<_>>();
72        let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
73        field_arrays.push(field_array.into_array());
74    }
75
76    StructArray::try_new_with_dtype(field_arrays, struct_dtype.clone(), len, validity)
77}
78
79fn pack_lists(
80    chunks: &[ArrayRef],
81    validity: Validity,
82    elem_dtype: &DType,
83) -> VortexResult<ListArray> {
84    let len: usize = chunks.iter().map(|c| c.len()).sum();
85    let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
86    offsets.push(0);
87    let mut elements = Vec::new();
88
89    for chunk in chunks {
90        let chunk = chunk.to_list()?;
91        // TODO: handle i32 offsets if they fit.
92        let offsets_arr = cast(
93            chunk.offsets(),
94            &DType::Primitive(PType::I64, Nullability::NonNullable),
95        )?
96        .to_primitive()?;
97
98        let first_offset_value: usize = usize::try_from(&scalar_at(&offsets_arr, 0)?)?;
99        let last_offset_value: usize =
100            usize::try_from(&scalar_at(&offsets_arr, offsets_arr.len() - 1)?)?;
101        elements.push(slice(
102            chunk.elements(),
103            first_offset_value,
104            last_offset_value,
105        )?);
106
107        let adjustment_from_previous = *offsets
108            .last()
109            .ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
110        offsets.extend(
111            offsets_arr
112                .as_slice::<i64>()
113                .iter()
114                .skip(1)
115                .map(|off| off + adjustment_from_previous - first_offset_value as i64),
116        );
117    }
118    let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
119    let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
120
121    ListArray::try_new(chunked_elements, offsets.into_array(), validity)
122}
123
124#[cfg(test)]
125mod tests {
126    use std::sync::Arc;
127
128    use vortex_dtype::DType::{List, Primitive};
129    use vortex_dtype::Nullability::NonNullable;
130    use vortex_dtype::PType::I32;
131
132    use crate::ToCanonical;
133    use crate::accessor::ArrayAccessor;
134    use crate::array::Array;
135    use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
136    use crate::compute::scalar_at;
137    use crate::validity::Validity;
138    use crate::variants::StructArrayTrait;
139
140    #[test]
141    pub fn pack_nested_structs() {
142        let struct_array = StructArray::try_new(
143            vec!["a".into()].into(),
144            vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
145            4,
146            Validity::NonNullable,
147        )
148        .unwrap();
149        let dtype = struct_array.dtype().clone();
150        let chunked = ChunkedArray::try_new(
151            vec![
152                ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
153                    .unwrap()
154                    .into_array(),
155            ],
156            dtype,
157        )
158        .unwrap()
159        .into_array();
160        let canonical_struct = chunked.to_struct().unwrap();
161        let canonical_varbin = canonical_struct
162            .maybe_null_field_by_idx(0)
163            .unwrap()
164            .to_varbinview()
165            .unwrap();
166        let original_varbin = struct_array
167            .maybe_null_field_by_idx(0)
168            .unwrap()
169            .to_varbinview()
170            .unwrap();
171        let orig_values = original_varbin
172            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
173            .unwrap();
174        let canon_values = canonical_varbin
175            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
176            .unwrap();
177        assert_eq!(orig_values, canon_values);
178    }
179
180    #[test]
181    pub fn pack_nested_lists() {
182        let l1 = ListArray::try_new(
183            PrimitiveArray::from_iter([1, 2, 3, 4]).into_array(),
184            PrimitiveArray::from_iter([0, 3]).into_array(),
185            Validity::NonNullable,
186        )
187        .unwrap();
188
189        let l2 = ListArray::try_new(
190            PrimitiveArray::from_iter([5, 6]).into_array(),
191            PrimitiveArray::from_iter([0, 2]).into_array(),
192            Validity::NonNullable,
193        )
194        .unwrap();
195
196        let chunked_list = ChunkedArray::try_new(
197            vec![l1.clone().into_array(), l2.clone().into_array()],
198            List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
199        );
200
201        let canon_values = chunked_list.unwrap().to_list().unwrap();
202
203        assert_eq!(
204            scalar_at(&l1, 0).unwrap(),
205            scalar_at(&canon_values, 0).unwrap()
206        );
207        assert_eq!(
208            scalar_at(&l2, 0).unwrap(),
209            scalar_at(&canon_values, 1).unwrap()
210        );
211    }
212}