vortex_array/arrays/chunked/
decode.rs1use vortex_buffer::BufferMut;
2use vortex_dtype::{DType, Nullability, PType, StructFields};
3use vortex_error::{VortexExpect, VortexResult, vortex_err};
4
5use super::ChunkedArray;
6use crate::arrays::{ChunkedVTable, ListArray, PrimitiveArray, StructArray};
7use crate::builders::{ArrayBuilder, builder_with_capacity};
8use crate::compute::cast;
9use crate::validity::Validity;
10use crate::vtable::CanonicalVTable;
11use crate::{Array as _, ArrayRef, Canonical, IntoArray, ToCanonical};
12
13impl CanonicalVTable<ChunkedVTable> for ChunkedVTable {
14 fn canonicalize(array: &ChunkedArray) -> VortexResult<Canonical> {
15 if array.nchunks() == 0 {
16 return Ok(Canonical::empty(array.dtype()));
17 }
18 if array.nchunks() == 1 {
19 return array.chunks()[0].to_canonical();
20 }
21 match array.dtype() {
22 DType::Struct(struct_dtype, _) => {
23 let struct_array = swizzle_struct_chunks(
24 array.chunks(),
25 Validity::copy_from_array(array.as_ref())?,
26 struct_dtype,
27 )?;
28 Ok(Canonical::Struct(struct_array))
29 }
30 DType::List(elem, _) => Ok(Canonical::List(pack_lists(
31 array.chunks(),
32 Validity::copy_from_array(array.as_ref())?,
33 elem,
34 )?)),
35 _ => {
36 let mut builder = builder_with_capacity(array.dtype(), array.len());
37 array.append_to_builder(builder.as_mut())?;
38 builder.finish().to_canonical()
39 }
40 }
41 }
42
43 fn append_to_builder(array: &ChunkedArray, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
44 for chunk in array.chunks() {
45 chunk.append_to_builder(builder)?;
46 }
47 Ok(())
48 }
49}
50
51fn swizzle_struct_chunks(
54 chunks: &[ArrayRef],
55 validity: Validity,
56 struct_dtype: &StructFields,
57) -> VortexResult<StructArray> {
58 let len = chunks.iter().map(|chunk| chunk.len()).sum();
59 let mut field_arrays = Vec::new();
60
61 for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
62 let field_chunks = chunks
63 .iter()
64 .map(|c| {
65 c.to_struct()
66 .vortex_expect("Chunk was not a StructArray")
67 .fields()
68 .get(field_idx)
69 .vortex_expect("Invalid field index")
70 .to_array()
71 })
72 .collect::<Vec<_>>();
73 let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
74 field_arrays.push(field_array.into_array());
75 }
76
77 StructArray::try_new_with_dtype(field_arrays, struct_dtype.clone(), len, validity)
78}
79
80fn pack_lists(
81 chunks: &[ArrayRef],
82 validity: Validity,
83 elem_dtype: &DType,
84) -> VortexResult<ListArray> {
85 let len: usize = chunks.iter().map(|c| c.len()).sum();
86 let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
87 offsets.push(0);
88 let mut elements = Vec::new();
89
90 for chunk in chunks {
91 let chunk = chunk.to_list()?;
92 let offsets_arr = cast(
94 chunk.offsets(),
95 &DType::Primitive(PType::I64, Nullability::NonNullable),
96 )?
97 .to_primitive()?;
98
99 let first_offset_value: usize = usize::try_from(&offsets_arr.scalar_at(0)?)?;
100 let last_offset_value: usize =
101 usize::try_from(&offsets_arr.scalar_at(offsets_arr.len() - 1)?)?;
102 elements.push(
103 chunk
104 .elements()
105 .slice(first_offset_value, last_offset_value)?,
106 );
107
108 let adjustment_from_previous = *offsets
109 .last()
110 .ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
111 offsets.extend(
112 offsets_arr
113 .as_slice::<i64>()
114 .iter()
115 .skip(1)
116 .map(|off| off + adjustment_from_previous - first_offset_value as i64),
117 );
118 }
119 let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
120 let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
121
122 ListArray::try_new(chunked_elements, offsets.into_array(), validity)
123}
124
125#[cfg(test)]
126mod tests {
127 use std::sync::Arc;
128
129 use vortex_dtype::DType::{List, Primitive};
130 use vortex_dtype::Nullability::NonNullable;
131 use vortex_dtype::PType::I32;
132
133 use crate::accessor::ArrayAccessor;
134 use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
135 use crate::validity::Validity;
136 use crate::{IntoArray, ToCanonical};
137
138 #[test]
139 pub fn pack_nested_structs() {
140 let struct_array = StructArray::try_new(
141 vec!["a".into()].into(),
142 vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
143 4,
144 Validity::NonNullable,
145 )
146 .unwrap();
147 let dtype = struct_array.dtype().clone();
148 let chunked = ChunkedArray::try_new(
149 vec![
150 ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
151 .unwrap()
152 .into_array(),
153 ],
154 dtype,
155 )
156 .unwrap()
157 .into_array();
158 let canonical_struct = chunked.to_struct().unwrap();
159 let canonical_varbin = canonical_struct.fields()[0].to_varbinview().unwrap();
160 let original_varbin = struct_array.fields()[0].to_varbinview().unwrap();
161 let orig_values = original_varbin
162 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
163 .unwrap();
164 let canon_values = canonical_varbin
165 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
166 .unwrap();
167 assert_eq!(orig_values, canon_values);
168 }
169
170 #[test]
171 pub fn pack_nested_lists() {
172 let l1 = ListArray::try_new(
173 PrimitiveArray::from_iter([1, 2, 3, 4]).into_array(),
174 PrimitiveArray::from_iter([0, 3]).into_array(),
175 Validity::NonNullable,
176 )
177 .unwrap();
178
179 let l2 = ListArray::try_new(
180 PrimitiveArray::from_iter([5, 6]).into_array(),
181 PrimitiveArray::from_iter([0, 2]).into_array(),
182 Validity::NonNullable,
183 )
184 .unwrap();
185
186 let chunked_list = ChunkedArray::try_new(
187 vec![l1.clone().into_array(), l2.clone().into_array()],
188 List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
189 );
190
191 let canon_values = chunked_list.unwrap().to_list().unwrap();
192
193 assert_eq!(l1.scalar_at(0).unwrap(), canon_values.scalar_at(0).unwrap());
194 assert_eq!(l2.scalar_at(0).unwrap(), canon_values.scalar_at(1).unwrap());
195 }
196}