vortex_array/arrays/chunked/
canonical.rs1use std::sync::Arc;
2
3use vortex_buffer::BufferMut;
4use vortex_dtype::{DType, Nullability, PType, StructDType};
5use vortex_error::{VortexExpect, VortexResult, vortex_err};
6
7use super::ChunkedArray;
8use crate::arrays::{ListArray, PrimitiveArray, StructArray};
9use crate::builders::{ArrayBuilder, builder_with_capacity};
10use crate::compute::{scalar_at, slice, try_cast};
11use crate::validity::Validity;
12use crate::{Array as _, ArrayCanonicalImpl, ArrayRef, Canonical, ToCanonical};
13
14impl ArrayCanonicalImpl for ChunkedArray {
15 fn _to_canonical(&self) -> VortexResult<Canonical> {
16 match self.dtype() {
17 DType::Struct(struct_dtype, _) => {
18 let struct_array = swizzle_struct_chunks(
19 self.chunks(),
20 Validity::copy_from_array(self)?,
21 struct_dtype,
22 )?;
23 Ok(Canonical::Struct(struct_array))
24 }
25 DType::List(elem, _) => Ok(Canonical::List(pack_lists(
26 self.chunks(),
27 Validity::copy_from_array(self)?,
28 elem,
29 )?)),
30 _ => {
31 let mut builder = builder_with_capacity(self.dtype(), self.len());
32 self.append_to_builder(builder.as_mut())?;
33 builder.finish().to_canonical()
34 }
35 }
36 }
37
38 fn _append_to_builder(&self, builder: &mut dyn ArrayBuilder) -> VortexResult<()> {
39 for chunk in self.chunks() {
40 chunk.append_to_builder(builder)?;
41 }
42 Ok(())
43 }
44}
45
46fn swizzle_struct_chunks(
49 chunks: &[ArrayRef],
50 validity: Validity,
51 struct_dtype: &Arc<StructDType>,
52) -> VortexResult<StructArray> {
53 let len = chunks.iter().map(|chunk| chunk.len()).sum();
54 let mut field_arrays = Vec::new();
55
56 for (field_idx, field_dtype) in struct_dtype.fields().enumerate() {
57 let field_chunks = chunks
58 .iter()
59 .map(|c| {
60 c.as_struct_typed()
61 .vortex_expect("Chunk was not a StructArray")
62 .maybe_null_field_by_idx(field_idx)
63 .vortex_expect("Invalid chunked array")
64 })
65 .collect::<Vec<_>>();
66 let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
67 field_arrays.push(field_array.into_array());
68 }
69
70 StructArray::try_new_with_dtype(field_arrays, struct_dtype.clone(), len, validity)
71}
72
73fn pack_lists(
74 chunks: &[ArrayRef],
75 validity: Validity,
76 elem_dtype: &DType,
77) -> VortexResult<ListArray> {
78 let len: usize = chunks.iter().map(|c| c.len()).sum();
79 let mut offsets = BufferMut::<i64>::with_capacity(len + 1);
80 offsets.push(0);
81 let mut elements = Vec::new();
82
83 for chunk in chunks {
84 let chunk = chunk.to_list()?;
85 let offsets_arr = try_cast(
87 chunk.offsets(),
88 &DType::Primitive(PType::I64, Nullability::NonNullable),
89 )?
90 .to_primitive()?;
91
92 let first_offset_value: usize = usize::try_from(&scalar_at(&offsets_arr, 0)?)?;
93 let last_offset_value: usize =
94 usize::try_from(&scalar_at(&offsets_arr, offsets_arr.len() - 1)?)?;
95 elements.push(slice(
96 chunk.elements(),
97 first_offset_value,
98 last_offset_value,
99 )?);
100
101 let adjustment_from_previous = *offsets
102 .last()
103 .ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
104 offsets.extend(
105 offsets_arr
106 .as_slice::<i64>()
107 .iter()
108 .skip(1)
109 .map(|off| off + adjustment_from_previous - first_offset_value as i64),
110 );
111 }
112 let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
113 let offsets = PrimitiveArray::new(offsets.freeze(), Validity::NonNullable);
114
115 ListArray::try_new(chunked_elements, offsets.into_array(), validity)
116}
117
118#[cfg(test)]
119mod tests {
120 use std::sync::Arc;
121
122 use vortex_dtype::DType::{List, Primitive};
123 use vortex_dtype::Nullability::NonNullable;
124 use vortex_dtype::PType::I32;
125
126 use crate::ToCanonical;
127 use crate::accessor::ArrayAccessor;
128 use crate::array::Array;
129 use crate::arrays::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinViewArray};
130 use crate::compute::scalar_at;
131 use crate::validity::Validity;
132 use crate::variants::StructArrayTrait;
133
134 #[test]
135 pub fn pack_nested_structs() {
136 let struct_array = StructArray::try_new(
137 vec!["a".into()].into(),
138 vec![VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"]).into_array()],
139 4,
140 Validity::NonNullable,
141 )
142 .unwrap();
143 let dtype = struct_array.dtype().clone();
144 let chunked = ChunkedArray::try_new(
145 vec![
146 ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
147 .unwrap()
148 .into_array(),
149 ],
150 dtype,
151 )
152 .unwrap()
153 .into_array();
154 let canonical_struct = chunked.to_struct().unwrap();
155 let canonical_varbin = canonical_struct
156 .maybe_null_field_by_idx(0)
157 .unwrap()
158 .to_varbinview()
159 .unwrap();
160 let original_varbin = struct_array
161 .maybe_null_field_by_idx(0)
162 .unwrap()
163 .to_varbinview()
164 .unwrap();
165 let orig_values = original_varbin
166 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
167 .unwrap();
168 let canon_values = canonical_varbin
169 .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
170 .unwrap();
171 assert_eq!(orig_values, canon_values);
172 }
173
174 #[test]
175 pub fn pack_nested_lists() {
176 let l1 = ListArray::try_new(
177 PrimitiveArray::from_iter([1, 2, 3, 4]).into_array(),
178 PrimitiveArray::from_iter([0, 3]).into_array(),
179 Validity::NonNullable,
180 )
181 .unwrap();
182
183 let l2 = ListArray::try_new(
184 PrimitiveArray::from_iter([5, 6]).into_array(),
185 PrimitiveArray::from_iter([0, 2]).into_array(),
186 Validity::NonNullable,
187 )
188 .unwrap();
189
190 let chunked_list = ChunkedArray::try_new(
191 vec![l1.clone().into_array(), l2.clone().into_array()],
192 List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
193 );
194
195 let canon_values = chunked_list.unwrap().to_list().unwrap();
196
197 assert_eq!(
198 scalar_at(&l1, 0).unwrap(),
199 scalar_at(&canon_values, 0).unwrap()
200 );
201 assert_eq!(
202 scalar_at(&l2, 0).unwrap(),
203 scalar_at(&canon_values, 1).unwrap()
204 );
205 }
206}