vortex_array/arrays/chunked/vtable/
canonical.rs1use vortex_buffer::BufferMut;
5use vortex_dtype::DType;
6use vortex_dtype::Nullability;
7use vortex_dtype::PType;
8use vortex_dtype::StructFields;
9use vortex_error::VortexExpect;
10
11use crate::Array;
12use crate::ArrayRef;
13use crate::Canonical;
14use crate::IntoArray;
15use crate::ToCanonical;
16use crate::arrays::ChunkedArray;
17use crate::arrays::ChunkedVTable;
18use crate::arrays::ListViewArray;
19use crate::arrays::ListViewRebuildMode;
20use crate::arrays::PrimitiveArray;
21use crate::arrays::StructArray;
22use crate::builders::ArrayBuilder;
23use crate::builders::builder_with_capacity;
24use crate::compute::cast;
25use crate::validity::Validity;
26use crate::vtable::CanonicalVTable;
27
28impl CanonicalVTable<ChunkedVTable> for ChunkedVTable {
29 fn canonicalize(array: &ChunkedArray) -> Canonical {
30 if array.nchunks() == 0 {
31 return Canonical::empty(array.dtype());
32 }
33 if array.nchunks() == 1 {
34 return array.chunks()[0].to_canonical();
35 }
36
37 match array.dtype() {
38 DType::Struct(struct_dtype, _) => {
39 let struct_array = pack_struct_chunks(
40 array.chunks(),
41 Validity::copy_from_array(array.as_ref()),
42 struct_dtype,
43 );
44 Canonical::Struct(struct_array)
45 }
46 DType::List(elem_dtype, _) => Canonical::List(swizzle_list_chunks(
47 array.chunks(),
48 Validity::copy_from_array(array.as_ref()),
49 elem_dtype,
50 )),
51 _ => {
52 let mut builder = builder_with_capacity(array.dtype(), array.len());
53 array.append_to_builder(builder.as_mut());
54 builder.finish_into_canonical()
55 }
56 }
57 }
58
59 fn append_to_builder(array: &ChunkedArray, builder: &mut dyn ArrayBuilder) {
60 for chunk in array.chunks() {
61 chunk.append_to_builder(builder);
62 }
63 }
64}
65
66fn pack_struct_chunks(
71 chunks: &[ArrayRef],
72 validity: Validity,
73 struct_dtype: &StructFields,
74) -> StructArray {
75 let len = chunks.iter().map(|chunk| chunk.len()).sum();
76 let mut field_arrays = Vec::new();
77
78 for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
79 let field_chunks = chunks
80 .iter()
81 .map(|c| {
82 c.to_struct()
83 .fields()
84 .get(field_idx)
85 .vortex_expect("Invalid field index")
86 .to_array()
87 })
88 .collect::<Vec<_>>();
89
90 let field_array = unsafe { ChunkedArray::new_unchecked(field_chunks, field_dtype.clone()) };
93 field_arrays.push(field_array.into_array());
94 }
95
96 unsafe { StructArray::new_unchecked(field_arrays, struct_dtype.clone(), len, validity) }
99}
100
101fn swizzle_list_chunks(
107 chunks: &[ArrayRef],
108 validity: Validity,
109 elem_dtype: &DType,
110) -> ListViewArray {
111 let len: usize = chunks.iter().map(|c| c.len()).sum();
112
113 assert_eq!(
114 chunks[0]
115 .dtype()
116 .as_list_element_opt()
117 .vortex_expect("DType was somehow not a list")
118 .as_ref(),
119 elem_dtype
120 );
121
122 let mut list_elements_chunks = Vec::with_capacity(chunks.len());
126 let mut num_elements = 0;
127
128 let mut offsets = BufferMut::<u64>::with_capacity(len);
133 let mut sizes = BufferMut::<u64>::with_capacity(len);
134
135 for chunk in chunks {
136 let chunk_array = chunk.to_listview();
137 let chunk_array = chunk_array.rebuild(ListViewRebuildMode::MakeExact);
140
141 list_elements_chunks.push(chunk_array.elements().clone());
143
144 let offsets_arr = cast(
146 chunk_array.offsets(),
147 &DType::Primitive(PType::U64, Nullability::NonNullable),
148 )
149 .vortex_expect("Must be able to fit array offsets in u64")
150 .to_primitive();
151
152 let sizes_arr = cast(
153 chunk_array.sizes(),
154 &DType::Primitive(PType::U64, Nullability::NonNullable),
155 )
156 .vortex_expect("Must be able to fit array offsets in u64")
157 .to_primitive();
158
159 let offsets_slice = offsets_arr.as_slice::<u64>();
160 let sizes_slice = sizes_arr.as_slice::<u64>();
161
162 offsets.extend(offsets_slice.iter().map(|o| o + num_elements));
164 sizes.extend(sizes_slice);
165
166 num_elements += chunk_array.elements().len() as u64;
167 }
168
169 let chunked_elements =
171 unsafe { ChunkedArray::new_unchecked(list_elements_chunks, elem_dtype.clone()) }
172 .into_array();
173
174 let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable).into_array();
175 let sizes = PrimitiveArray::new(sizes.freeze(), Validity::NonNullable).into_array();
176
177 unsafe {
185 ListViewArray::new_unchecked(chunked_elements, offsets, sizes, validity)
186 .with_zero_copy_to_list(true)
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use std::sync::Arc;
193
194 use vortex_buffer::buffer;
195 use vortex_dtype::DType::List;
196 use vortex_dtype::DType::Primitive;
197 use vortex_dtype::Nullability::NonNullable;
198 use vortex_dtype::PType::I32;
199
200 use crate::IntoArray;
201 use crate::ToCanonical;
202 use crate::accessor::ArrayAccessor;
203 use crate::arrays::ChunkedArray;
204 use crate::arrays::ListArray;
205 use crate::arrays::StructArray;
206 use crate::arrays::VarBinViewArray;
207 use crate::validity::Validity;
208
209 #[test]
210 pub fn pack_nested_structs() {
211 let struct_array = StructArray::try_new(
212 ["a"].into(),
213 vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
214 4,
215 Validity::NonNullable,
216 )
217 .unwrap();
218 let dtype = struct_array.dtype().clone();
219 let chunked = ChunkedArray::try_new(
220 vec![
221 ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
222 .unwrap()
223 .into_array(),
224 ],
225 dtype,
226 )
227 .unwrap()
228 .into_array();
229 let canonical_struct = chunked.to_struct();
230 let canonical_varbin = canonical_struct.fields()[0].to_varbinview();
231 let original_varbin = struct_array.fields()[0].to_varbinview();
232 let orig_values = original_varbin
233 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>());
234 let canon_values = canonical_varbin
235 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>());
236 assert_eq!(orig_values, canon_values);
237 }
238
239 #[test]
240 pub fn pack_nested_lists() {
241 let l1 = ListArray::try_new(
242 buffer![1, 2, 3, 4].into_array(),
243 buffer![0, 3].into_array(),
244 Validity::NonNullable,
245 )
246 .unwrap();
247
248 let l2 = ListArray::try_new(
249 buffer![5, 6].into_array(),
250 buffer![0, 2].into_array(),
251 Validity::NonNullable,
252 )
253 .unwrap();
254
255 let chunked_list = ChunkedArray::try_new(
256 vec![l1.clone().into_array(), l2.clone().into_array()],
257 List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
258 );
259
260 let canon_values = chunked_list.unwrap().to_listview();
261
262 assert_eq!(l1.scalar_at(0), canon_values.scalar_at(0));
263 assert_eq!(l2.scalar_at(0), canon_values.scalar_at(1));
264 }
265}