vortex_array/arrays/chunked/
decode.rs1use vortex_buffer::BufferMut;
5use vortex_dtype::{DType, Nullability, PType, StructFields};
6use vortex_error::{VortexExpect, VortexUnwrap, vortex_err};
7
8use super::ChunkedArray;
9use crate::arrays::{ChunkedVTable, ListArray, PrimitiveArray, StructArray};
10use crate::builders::{ArrayBuilder, builder_with_capacity};
11use crate::compute::cast;
12use crate::validity::Validity;
13use crate::vtable::CanonicalVTable;
14use crate::{Array as _, ArrayRef, Canonical, IntoArray, ToCanonical};
15
16impl CanonicalVTable<ChunkedVTable> for ChunkedVTable {
17 fn canonicalize(array: &ChunkedArray) -> Canonical {
18 if array.nchunks() == 0 {
19 return Canonical::empty(array.dtype());
20 }
21 if array.nchunks() == 1 {
22 return array.chunks()[0].to_canonical();
23 }
24
25 match array.dtype() {
26 DType::Struct(struct_dtype, _) => {
27 let struct_array = swizzle_struct_chunks(
28 array.chunks(),
29 Validity::copy_from_array(array.as_ref()),
30 struct_dtype,
31 );
32 Canonical::Struct(struct_array)
33 }
34 DType::List(elem_dtype, _) => Canonical::List(pack_lists(
35 array.chunks(),
36 Validity::copy_from_array(array.as_ref()),
37 elem_dtype,
38 )),
39 _ => {
40 let mut builder = builder_with_capacity(array.dtype(), array.len());
41 array.append_to_builder(builder.as_mut());
42 builder.finish_into_canonical()
43 }
44 }
45 }
46
47 fn append_to_builder(array: &ChunkedArray, builder: &mut dyn ArrayBuilder) {
48 for chunk in array.chunks() {
49 chunk.append_to_builder(builder);
50 }
51 }
52}
53
54fn swizzle_struct_chunks(
57 chunks: &[ArrayRef],
58 validity: Validity,
59 struct_dtype: &StructFields,
60) -> StructArray {
61 let len = chunks.iter().map(|chunk| chunk.len()).sum();
62 let mut field_arrays = Vec::new();
63
64 for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
65 let field_chunks = chunks
66 .iter()
67 .map(|c| {
68 c.to_struct()
69 .fields()
70 .get(field_idx)
71 .vortex_expect("Invalid field index")
72 .to_array()
73 })
74 .collect::<Vec<_>>();
75 let field_array = unsafe { ChunkedArray::new_unchecked(field_chunks, field_dtype.clone()) };
78 field_arrays.push(field_array.into_array());
79 }
80
81 unsafe { StructArray::new_unchecked(field_arrays, struct_dtype.clone(), len, validity) }
84}
85
86fn pack_lists(chunks: &[ArrayRef], validity: Validity, elem_dtype: &DType) -> ListArray {
87 let len: usize = chunks.iter().map(|c| c.len()).sum();
88 let mut offsets = BufferMut::<u64>::with_capacity(len + 1);
89 offsets.push(0);
90 let mut elements = Vec::new();
91
92 for chunk in chunks {
93 let chunk = chunk.to_list();
94 let offsets_arr = cast(
96 chunk.offsets(),
97 &DType::Primitive(PType::U64, Nullability::NonNullable),
98 )
99 .vortex_expect("Must fit array offsets in u64")
100 .to_primitive();
101
102 let first_offset_value: usize =
103 usize::try_from(&offsets_arr.scalar_at(0)).vortex_expect("Offset must be a usize");
104 let last_offset_value: usize =
105 usize::try_from(&offsets_arr.scalar_at(offsets_arr.len() - 1))
106 .vortex_expect("Offset must be a usize");
107 elements.push(
108 chunk
109 .elements()
110 .slice(first_offset_value..last_offset_value),
111 );
112
113 let adjustment_from_previous = *offsets
114 .last()
115 .ok_or_else(|| vortex_err!("List offsets must have at least one element"))
116 .vortex_unwrap();
117 offsets.extend_trusted(
118 offsets_arr
119 .as_slice::<u64>()
120 .iter()
121 .skip(1)
122 .map(|off| off + adjustment_from_previous - first_offset_value as u64),
123 );
124 }
125 let chunked_elements =
128 unsafe { ChunkedArray::new_unchecked(elements, elem_dtype.clone()) }.into_array();
129 let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
130
131 unsafe { ListArray::new_unchecked(chunked_elements, offsets.into_array(), validity) }
136}
137
138#[cfg(test)]
139mod tests {
140 use std::sync::Arc;
141
142 use vortex_buffer::buffer;
143 use vortex_dtype::DType::{List, Primitive};
144 use vortex_dtype::Nullability::NonNullable;
145 use vortex_dtype::PType::I32;
146
147 use crate::accessor::ArrayAccessor;
148 use crate::arrays::{ChunkedArray, ListArray, StructArray, VarBinViewArray};
149 use crate::validity::Validity;
150 use crate::{IntoArray, ToCanonical};
151
152 #[test]
153 pub fn pack_nested_structs() {
154 let struct_array = StructArray::try_new(
155 ["a"].into(),
156 vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
157 4,
158 Validity::NonNullable,
159 )
160 .unwrap();
161 let dtype = struct_array.dtype().clone();
162 let chunked = ChunkedArray::try_new(
163 vec![
164 ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
165 .unwrap()
166 .into_array(),
167 ],
168 dtype,
169 )
170 .unwrap()
171 .into_array();
172 let canonical_struct = chunked.to_struct();
173 let canonical_varbin = canonical_struct.fields()[0].to_varbinview();
174 let original_varbin = struct_array.fields()[0].to_varbinview();
175 let orig_values = original_varbin
176 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
177 .unwrap();
178 let canon_values = canonical_varbin
179 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
180 .unwrap();
181 assert_eq!(orig_values, canon_values);
182 }
183
184 #[test]
185 pub fn pack_nested_lists() {
186 let l1 = ListArray::try_new(
187 buffer![1, 2, 3, 4].into_array(),
188 buffer![0, 3].into_array(),
189 Validity::NonNullable,
190 )
191 .unwrap();
192
193 let l2 = ListArray::try_new(
194 buffer![5, 6].into_array(),
195 buffer![0, 2].into_array(),
196 Validity::NonNullable,
197 )
198 .unwrap();
199
200 let chunked_list = ChunkedArray::try_new(
201 vec![l1.clone().into_array(), l2.clone().into_array()],
202 List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
203 );
204
205 let canon_values = chunked_list.unwrap().to_list();
206
207 assert_eq!(l1.scalar_at(0), canon_values.scalar_at(0));
208 assert_eq!(l2.scalar_at(0), canon_values.scalar_at(1));
209 }
210}