vortex_layout/layouts/
compact.rs1use vortex_array::Array;
5use vortex_array::ArrayRef;
6use vortex_array::Canonical;
7use vortex_array::IntoArray;
8use vortex_array::ToCanonical;
9use vortex_array::arrays::ExtensionArray;
10use vortex_array::arrays::FixedSizeListArray;
11use vortex_array::arrays::ListViewArray;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::StructArray;
14use vortex_array::arrays::narrowed_decimal;
15use vortex_array::vtable::ValidityHelper;
16use vortex_decimal_byte_parts::DecimalBytePartsArray;
17use vortex_dtype::PType;
18use vortex_error::VortexResult;
19use vortex_pco::PcoArray;
20use vortex_scalar::DecimalType;
21use vortex_zstd::ZstdArray;
22
23fn is_pco_number_type(ptype: PType) -> bool {
24 matches!(
25 ptype,
26 PType::F16
27 | PType::F32
28 | PType::F64
29 | PType::I16
30 | PType::I32
31 | PType::I64
32 | PType::U16
33 | PType::U32
34 | PType::U64
35 )
36}
37
38#[derive(Debug, Clone)]
42pub struct CompactCompressor {
43 pco_level: usize,
44 zstd_level: i32,
45 values_per_page: usize,
46}
47
48impl CompactCompressor {
49 pub fn with_pco_level(mut self, level: usize) -> Self {
50 self.pco_level = level;
51 self
52 }
53
54 pub fn with_zstd_level(mut self, level: i32) -> Self {
55 self.zstd_level = level;
56 self
57 }
58
59 pub fn with_values_per_page(mut self, values_per_page: usize) -> Self {
66 self.values_per_page = values_per_page;
67 self
68 }
69
70 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
71 self.compress_canonical(array.to_canonical())
72 }
73
74 pub fn compress_canonical(&self, canonical: Canonical) -> VortexResult<ArrayRef> {
76 let uncompressed_nbytes = canonical.as_ref().nbytes();
77 let compressed = match &canonical {
78 Canonical::Primitive(primitive) => {
80 let ptype = primitive.ptype();
82
83 if is_pco_number_type(ptype) {
84 let pco_array =
85 PcoArray::from_primitive(primitive, self.pco_level, self.values_per_page)?;
86 pco_array.into_array()
87 } else {
88 let zstd_array = ZstdArray::from_primitive(
89 primitive,
90 self.zstd_level,
91 self.values_per_page,
92 )?;
93 zstd_array.into_array()
94 }
95 }
96 Canonical::Decimal(decimal) => {
97 let decimal = narrowed_decimal(decimal.clone());
98 let validity = decimal.validity();
99 let int_values = match decimal.values_type() {
100 DecimalType::I8 => {
101 PrimitiveArray::new(decimal.buffer::<i8>(), validity.clone())
102 }
103 DecimalType::I16 => {
104 PrimitiveArray::new(decimal.buffer::<i16>(), validity.clone())
105 }
106 DecimalType::I32 => {
107 PrimitiveArray::new(decimal.buffer::<i32>(), validity.clone())
108 }
109 DecimalType::I64 => {
110 PrimitiveArray::new(decimal.buffer::<i64>(), validity.clone())
111 }
112 _ => {
113 return Ok(canonical.into_array());
115 }
116 };
117 let compressed = self.compress_canonical(Canonical::Primitive(int_values))?;
118 DecimalBytePartsArray::try_new(compressed, decimal.decimal_dtype())?.to_array()
119 }
120 Canonical::VarBinView(vbv) => {
121 ZstdArray::from_var_bin_view(vbv, self.zstd_level, self.values_per_page)?
123 .into_array()
124 }
125 Canonical::Struct(struct_array) => {
126 let fields = struct_array
128 .fields()
129 .iter()
130 .map(|field| self.compress(field))
131 .collect::<VortexResult<Vec<_>>>()?;
132
133 StructArray::try_new(
134 struct_array.names().clone(),
135 fields,
136 struct_array.len(),
137 struct_array.validity().clone(),
138 )?
139 .into_array()
140 }
141 Canonical::List(listview) => {
142 let compressed_elems = self.compress(listview.elements())?;
143
144 let compressed_offsets =
147 self.compress(&listview.offsets().to_primitive().narrow()?.into_array())?;
148 let compressed_sizes =
149 self.compress(&listview.sizes().to_primitive().narrow()?.into_array())?;
150
151 unsafe {
156 ListViewArray::new_unchecked(
157 compressed_elems,
158 compressed_offsets,
159 compressed_sizes,
160 listview.validity().clone(),
161 )
162 .with_zero_copy_to_list(listview.is_zero_copy_to_list())
163 }
164 .into_array()
165 }
166 Canonical::FixedSizeList(fsl) => {
167 let compressed_elems = self.compress(fsl.elements())?;
168
169 FixedSizeListArray::try_new(
170 compressed_elems,
171 fsl.list_size(),
172 fsl.validity().clone(),
173 fsl.len(),
174 )?
175 .into_array()
176 }
177 Canonical::Extension(ext_array) => {
178 let compressed_storage = self.compress(ext_array.storage())?;
179
180 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage).into_array()
181 }
182 _ => return Ok(canonical.into_array()),
183 };
184
185 if compressed.nbytes() >= uncompressed_nbytes {
186 return Ok(canonical.into_array());
187 }
188 Ok(compressed)
189 }
190}
191
192impl Default for CompactCompressor {
193 fn default() -> Self {
194 Self {
195 pco_level: pco::DEFAULT_COMPRESSION_LEVEL,
196 zstd_level: 3,
197 values_per_page: 8192,
202 }
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use vortex_array::IntoArray;
209 use vortex_array::ToCanonical;
210 use vortex_array::arrays::PrimitiveArray;
211 use vortex_array::arrays::StructArray;
212 use vortex_array::assert_arrays_eq;
213 use vortex_array::validity::Validity;
214 use vortex_buffer::buffer;
215 use vortex_dtype::FieldName;
216
217 use super::*;
218
219 #[test]
220 fn test_compact_compressor_struct_with_mixed_types() {
221 let compressor = CompactCompressor::default();
222
223 let columns = vec![
225 PrimitiveArray::new(buffer![1.0f64, 2.0, 3.0, 4.0, 5.0], Validity::NonNullable),
227 PrimitiveArray::new(buffer![10i32, 20, 30, 40, 50], Validity::NonNullable),
228 PrimitiveArray::new(buffer![11u8, 22, 33, 44, 55], Validity::NonNullable),
230 ]
231 .iter()
232 .map(|a| a.clone().into_array())
233 .collect::<Vec<_>>();
234 let field_names: Vec<FieldName> =
235 vec!["f64_field".into(), "i32_field".into(), "u8_field".into()];
236
237 let n_rows = columns[0].len();
238 let struct_array = StructArray::try_new(
239 field_names.clone().into(),
240 columns.clone(),
241 n_rows,
242 Validity::NonNullable,
243 )
244 .unwrap();
245
246 let compressed = compressor.compress(struct_array.as_ref()).unwrap();
248
249 let decompressed = compressed.to_canonical().into_array();
251 assert_eq!(decompressed.len(), n_rows);
252 let decompressed_struct = decompressed.to_struct();
253
254 for (i, name) in decompressed_struct.names().iter().enumerate() {
256 assert_eq!(name, field_names[i]);
257 let decompressed_array = decompressed_struct.field_by_name(name).unwrap().clone();
258 assert_eq!(decompressed_array.len(), n_rows);
259
260 assert_arrays_eq!(decompressed_array.as_ref(), columns[i].as_ref());
261 }
262 }
263}