vortex_array/arrays/chunked/
decode.rs1use vortex_buffer::BufferMut;
5use vortex_dtype::{DType, Nullability, PType, StructFields};
6use vortex_error::{VortexExpect, VortexResult, vortex_err};
7
8use super::ChunkedArray;
9use crate::arrays::{ChunkedVTable, ListArray, PrimitiveArray, StructArray};
10use crate::builders::{ArrayBuilder, builder_with_capacity};
11use crate::compute::cast;
12use crate::validity::Validity;
13use crate::vtable::CanonicalVTable;
14use crate::{Array as _, ArrayRef, Canonical, IntoArray, ToCanonical};
15
16impl CanonicalVTable<ChunkedVTable> for ChunkedVTable {
17 fn canonicalize(array: &ChunkedArray) -> VortexResult<Canonical> {
18 if array.nchunks() == 0 {
19 return Ok(Canonical::empty(array.dtype()));
20 }
21 if array.nchunks() == 1 {
22 return array.chunks()[0].to_canonical();
23 }
24
25 match array.dtype() {
26 DType::Struct(struct_dtype, _) => {
27 let struct_array = swizzle_struct_chunks(
28 array.chunks(),
29 Validity::copy_from_array(array.as_ref())?,
30 struct_dtype,
31 )?;
32 Ok(Canonical::Struct(struct_array))
33 }
34 DType::List(elem_dtype, _) => Ok(Canonical::List(pack_lists(
35 array.chunks(),
36 Validity::copy_from_array(array.as_ref())?,
37 elem_dtype,
38 )?)),
39 _ => {
40 let mut builder = builder_with_capacity(array.dtype(), array.len());
41 array.append_to_builder(builder.as_mut())?;
42 builder.finish().to_canonical()
43 }
44 }
45 }
46
47 fn append_to_builder(array: &ChunkedArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
48 for chunk in array.chunks() {
49 chunk.append_to_builder(builder)?;
50 }
51 Ok(())
52 }
53}
54
55fn swizzle_struct_chunks(
58 chunks: &[ArrayRef],
59 validity: Validity,
60 struct_dtype: &StructFields,
61) -> VortexResult<StructArray> {
62 let len = chunks.iter().map(|chunk| chunk.len()).sum();
63 let mut field_arrays = Vec::new();
64
65 for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
66 let field_chunks = chunks
67 .iter()
68 .map(|c| {
69 c.to_struct()
70 .vortex_expect("Chunk was not a StructArray")
71 .fields()
72 .get(field_idx)
73 .vortex_expect("Invalid field index")
74 .to_array()
75 })
76 .collect::<Vec<_>>();
77 let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
78 field_arrays.push(field_array.into_array());
79 }
80
81 StructArray::try_new_with_dtype(field_arrays, struct_dtype.clone(), len, validity)
82}
83
84fn pack_lists(
85 chunks: &[ArrayRef],
86 validity: Validity,
87 elem_dtype: &DType,
88) -> VortexResult<ListArray> {
89 let len: usize = chunks.iter().map(|c| c.len()).sum();
90 let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
91 offsets.push(0);
92 let mut elements = Vec::new();
93
94 for chunk in chunks {
95 let chunk = chunk.to_list()?;
96 let offsets_arr = cast(
98 chunk.offsets(),
99 &DType::Primitive(PType::I64, Nullability::NonNullable),
100 )?
101 .to_primitive()?;
102
103 let first_offset_value: usize = usize::try_from(&offsets_arr.scalar_at(0))?;
104 let last_offset_value: usize =
105 usize::try_from(&offsets_arr.scalar_at(offsets_arr.len() - 1))?;
106 elements.push(
107 chunk
108 .elements()
109 .slice(first_offset_value, last_offset_value),
110 );
111
112 let adjustment_from_previous = *offsets
113 .last()
114 .ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
115 offsets.extend_trusted(
116 offsets_arr
117 .as_slice::<i64>()
118 .iter()
119 .skip(1)
120 .map(|off| off + adjustment_from_previous - first_offset_value as i64),
121 );
122 }
123 let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
124 let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
125
126 ListArray::try_new(chunked_elements, offsets.into_array(), validity)
127}
128
129#[cfg(test)]
130mod tests {
131 use std::sync::Arc;
132
133 use vortex_dtype::DType::{List, Primitive};
134 use vortex_dtype::Nullability::NonNullable;
135 use vortex_dtype::PType::I32;
136
137 use crate::accessor::ArrayAccessor;
138 use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
139 use crate::validity::Validity;
140 use crate::{IntoArray, ToCanonical};
141
142 #[test]
143 pub fn pack_nested_structs() {
144 let struct_array = StructArray::try_new(
145 vec!["a".into()].into(),
146 vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
147 4,
148 Validity::NonNullable,
149 )
150 .unwrap();
151 let dtype = struct_array.dtype().clone();
152 let chunked = ChunkedArray::try_new(
153 vec![
154 ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
155 .unwrap()
156 .into_array(),
157 ],
158 dtype,
159 )
160 .unwrap()
161 .into_array();
162 let canonical_struct = chunked.to_struct().unwrap();
163 let canonical_varbin = canonical_struct.fields()[0].to_varbinview().unwrap();
164 let original_varbin = struct_array.fields()[0].to_varbinview().unwrap();
165 let orig_values = original_varbin
166 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
167 .unwrap();
168 let canon_values = canonical_varbin
169 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
170 .unwrap();
171 assert_eq!(orig_values, canon_values);
172 }
173
174 #[test]
175 pub fn pack_nested_lists() {
176 let l1 = ListArray::try_new(
177 PrimitiveArray::from_iter([1, 2, 3, 4]).into_array(),
178 PrimitiveArray::from_iter([0, 3]).into_array(),
179 Validity::NonNullable,
180 )
181 .unwrap();
182
183 let l2 = ListArray::try_new(
184 PrimitiveArray::from_iter([5, 6]).into_array(),
185 PrimitiveArray::from_iter([0, 2]).into_array(),
186 Validity::NonNullable,
187 )
188 .unwrap();
189
190 let chunked_list = ChunkedArray::try_new(
191 vec![l1.clone().into_array(), l2.clone().into_array()],
192 List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
193 );
194
195 let canon_values = chunked_list.unwrap().to_list().unwrap();
196
197 assert_eq!(l1.scalar_at(0), canon_values.scalar_at(0));
198 assert_eq!(l2.scalar_at(0), canon_values.scalar_at(1));
199 }
200}