vortex_layout/layouts/
compact.rs1use vortex_array::arrays::{
5 ExtensionArray, FixedSizeListArray, ListArray, PrimitiveArray, StructArray, narrowed_decimal,
6};
7use vortex_array::vtable::ValidityHelper;
8use vortex_array::{Array, ArrayRef, Canonical, IntoArray};
9use vortex_decimal_byte_parts::DecimalBytePartsArray;
10use vortex_dtype::PType;
11use vortex_error::VortexResult;
12use vortex_pco::PcoArray;
13use vortex_scalar::DecimalValueType;
14use vortex_zstd::ZstdArray;
15
16fn is_pco_number_type(ptype: PType) -> bool {
17 matches!(
18 ptype,
19 PType::F16
20 | PType::F32
21 | PType::F64
22 | PType::I16
23 | PType::I32
24 | PType::I64
25 | PType::U16
26 | PType::U32
27 | PType::U64
28 )
29}
30
31#[derive(Debug, Clone)]
35pub struct CompactCompressor {
36 pco_level: usize,
37 zstd_level: i32,
38 values_per_page: usize,
39}
40
41impl CompactCompressor {
42 pub fn with_pco_level(mut self, level: usize) -> Self {
43 self.pco_level = level;
44 self
45 }
46
47 pub fn with_zstd_level(mut self, level: i32) -> Self {
48 self.zstd_level = level;
49 self
50 }
51
52 pub fn with_values_per_page(mut self, values_per_page: usize) -> Self {
59 self.values_per_page = values_per_page;
60 self
61 }
62
63 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
64 self.compress_canonical(array.to_canonical())
65 }
66
67 pub fn compress_canonical(&self, canonical: Canonical) -> VortexResult<ArrayRef> {
69 let uncompressed_nbytes = canonical.as_ref().nbytes();
70 let compressed = match &canonical {
71 Canonical::Primitive(primitive) => {
73 let ptype = primitive.ptype();
75
76 if is_pco_number_type(ptype) {
77 let pco_array =
78 PcoArray::from_primitive(primitive, self.pco_level, self.values_per_page)?;
79 pco_array.into_array()
80 } else {
81 let zstd_array = ZstdArray::from_primitive(
82 primitive,
83 self.zstd_level,
84 self.values_per_page,
85 )?;
86 zstd_array.into_array()
87 }
88 }
89 Canonical::Decimal(decimal) => {
90 let decimal = narrowed_decimal(decimal.clone());
91 let validity = decimal.validity();
92 let int_values = match decimal.values_type() {
93 DecimalValueType::I8 => {
94 PrimitiveArray::new(decimal.buffer::<i8>(), validity.clone())
95 }
96 DecimalValueType::I16 => {
97 PrimitiveArray::new(decimal.buffer::<i16>(), validity.clone())
98 }
99 DecimalValueType::I32 => {
100 PrimitiveArray::new(decimal.buffer::<i32>(), validity.clone())
101 }
102 DecimalValueType::I64 => {
103 PrimitiveArray::new(decimal.buffer::<i64>(), validity.clone())
104 }
105 _ => {
106 return Ok(canonical.into_array());
108 }
109 };
110 let compressed = self.compress_canonical(Canonical::Primitive(int_values))?;
111 DecimalBytePartsArray::try_new(compressed, decimal.decimal_dtype())?.to_array()
112 }
113 Canonical::VarBinView(vbv) => {
114 ZstdArray::from_var_bin_view(vbv, self.zstd_level, self.values_per_page)?
116 .into_array()
117 }
118 Canonical::Struct(struct_array) => {
119 let fields = struct_array
121 .fields()
122 .iter()
123 .map(|field| self.compress(field))
124 .collect::<VortexResult<Vec<_>>>()?;
125
126 StructArray::try_new(
127 struct_array.names().clone(),
128 fields,
129 struct_array.len(),
130 struct_array.validity().clone(),
131 )?
132 .into_array()
133 }
134 Canonical::List(list_array) => {
135 let compressed_elems = self.compress(list_array.elements())?;
137 let compressed_offsets = self.compress(list_array.offsets())?;
138
139 ListArray::try_new(
140 compressed_elems,
141 compressed_offsets,
142 list_array.validity().clone(),
143 )?
144 .into_array()
145 }
146 Canonical::FixedSizeList(list_array) => {
147 let compressed_elems = self.compress(list_array.elements())?;
149
150 FixedSizeListArray::try_new(
151 compressed_elems,
152 list_array.list_size(),
153 list_array.validity().clone(),
154 list_array.len(),
155 )?
156 .into_array()
157 }
158 Canonical::Extension(ext_array) => {
159 let compressed_storage = self.compress(ext_array.storage())?;
161
162 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage).into_array()
163 }
164 _ => return Ok(canonical.into_array()),
165 };
166
167 if compressed.nbytes() >= uncompressed_nbytes {
168 return Ok(canonical.into_array());
169 }
170 Ok(compressed)
171 }
172}
173
174impl Default for CompactCompressor {
175 fn default() -> Self {
176 Self {
177 pco_level: pco::DEFAULT_COMPRESSION_LEVEL,
178 zstd_level: 3,
179 values_per_page: 8192,
184 }
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use vortex_array::arrays::{PrimitiveArray, StructArray};
191 use vortex_array::validity::Validity;
192 use vortex_array::{IntoArray, ToCanonical};
193 use vortex_buffer::buffer;
194 use vortex_dtype::FieldName;
195
196 use super::*;
197
198 #[test]
199 fn test_compact_compressor_struct_with_mixed_types() {
200 let compressor = CompactCompressor::default();
201
202 let columns = vec![
204 PrimitiveArray::new(buffer![1.0f64, 2.0, 3.0, 4.0, 5.0], Validity::NonNullable),
206 PrimitiveArray::new(buffer![10i32, 20, 30, 40, 50], Validity::NonNullable),
207 PrimitiveArray::new(buffer![11u8, 22, 33, 44, 55], Validity::NonNullable),
209 ]
210 .iter()
211 .map(|a| a.clone().into_array())
212 .collect::<Vec<_>>();
213 let field_names: Vec<FieldName> =
214 vec!["f64_field".into(), "i32_field".into(), "u8_field".into()];
215
216 let n_rows = columns[0].len();
217 let struct_array = StructArray::try_new(
218 field_names.clone().into(),
219 columns.clone(),
220 n_rows,
221 Validity::NonNullable,
222 )
223 .unwrap();
224
225 let compressed = compressor.compress(struct_array.as_ref()).unwrap();
227
228 let decompressed = compressed.to_canonical().into_array();
230 assert_eq!(decompressed.len(), n_rows);
231 let decompressed_struct = decompressed.to_struct();
232
233 for (i, name) in decompressed_struct.names().iter().enumerate() {
235 assert_eq!(name, field_names[i]);
236 let decompressed_array = decompressed_struct
237 .field_by_name(name)
238 .unwrap()
239 .to_primitive();
240 assert_eq!(decompressed_array.len(), n_rows);
242
243 for j in 0..n_rows {
244 assert_eq!(decompressed_array.scalar_at(j), columns[i].scalar_at(j),);
245 }
246 }
247 }
248}