1use vortex_array::Array;
7use vortex_array::ArrayRef;
8use vortex_array::Canonical;
9use vortex_array::IntoArray;
10use vortex_array::ToCanonical;
11use vortex_array::arrays::ConstantArray;
12use vortex_array::arrays::ExtensionArray;
13use vortex_array::arrays::FixedSizeListArray;
14use vortex_array::arrays::ListArray;
15use vortex_array::arrays::ListViewArray;
16use vortex_array::arrays::StructArray;
17use vortex_array::arrays::TemporalArray;
18use vortex_array::arrays::list_from_list_view;
19use vortex_array::compute::Cost;
20use vortex_array::compute::IsConstantOpts;
21use vortex_array::compute::is_constant_opts;
22use vortex_array::vtable::ValidityHelper;
23use vortex_dtype::DType;
24use vortex_dtype::Nullability;
25use vortex_dtype::datetime::TemporalMetadata;
26use vortex_error::VortexResult;
27
28use crate::BtrBlocksCompressorBuilder;
29use crate::CompressorContext;
30use crate::CompressorExt;
31use crate::Excludes;
32use crate::FloatCompressor;
33use crate::IntCode;
34use crate::IntCompressor;
35use crate::StringCompressor;
36use crate::compressor::decimal::compress_decimal;
37use crate::compressor::float::FloatScheme;
38use crate::compressor::integer::IntegerScheme;
39use crate::compressor::string::StringScheme;
40use crate::compressor::temporal::compress_temporal;
41
42pub trait CanonicalCompressor {
47 fn compress_canonical(
49 &self,
50 array: Canonical,
51 ctx: CompressorContext,
52 excludes: Excludes,
53 ) -> VortexResult<ArrayRef>;
54
55 fn int_schemes(&self) -> &[&'static dyn IntegerScheme];
57
58 fn float_schemes(&self) -> &[&'static dyn FloatScheme];
60
61 fn string_schemes(&self) -> &[&'static dyn StringScheme];
63}
64
65#[derive(Clone)]
93pub struct BtrBlocksCompressor {
94 pub int_schemes: Vec<&'static dyn IntegerScheme>,
96
97 pub float_schemes: Vec<&'static dyn FloatScheme>,
99
100 pub string_schemes: Vec<&'static dyn StringScheme>,
102}
103
104impl Default for BtrBlocksCompressor {
105 fn default() -> Self {
106 BtrBlocksCompressorBuilder::default().build()
107 }
108}
109
110impl BtrBlocksCompressor {
111 pub fn compress(&self, array: &dyn Array) -> VortexResult<ArrayRef> {
115 let canonical = array.to_canonical()?;
117
118 let compact = canonical.compact()?;
120
121 self.compress_canonical(compact, CompressorContext::default(), Excludes::none())
122 }
123
124 pub(crate) fn integer_compressor(&self) -> IntCompressor<'_> {
125 IntCompressor {
126 btr_blocks_compressor: self,
127 }
128 }
129
130 pub(crate) fn float_compressor(&self) -> FloatCompressor<'_> {
131 FloatCompressor {
132 btr_blocks_compressor: self,
133 }
134 }
135
136 pub(crate) fn string_compressor(&self) -> StringCompressor<'_> {
137 StringCompressor {
138 btr_blocks_compressor: self,
139 }
140 }
141
142 fn compress_list_array(
144 &self,
145 list_array: ListArray,
146 ctx: CompressorContext,
147 ) -> VortexResult<ArrayRef> {
148 let list_array = list_array.reset_offsets(true)?;
152
153 let compressed_elems = self.compress(list_array.elements())?;
154
155 let compressed_offsets = self.compress_canonical(
159 Canonical::Primitive(list_array.offsets().to_primitive().narrow()?),
160 ctx,
161 Excludes::from(&[IntCode::Dict]),
162 )?;
163
164 Ok(ListArray::try_new(
165 compressed_elems,
166 compressed_offsets,
167 list_array.validity().clone(),
168 )?
169 .into_array())
170 }
171
172 fn compress_list_view_array(
175 &self,
176 list_view: ListViewArray,
177 ctx: CompressorContext,
178 ) -> VortexResult<ArrayRef> {
179 let compressed_elems = self.compress(list_view.elements())?;
180 let compressed_offsets = self.compress_canonical(
181 Canonical::Primitive(list_view.offsets().to_primitive().narrow()?),
182 ctx,
183 Excludes::none(),
184 )?;
185 let compressed_sizes = self.compress_canonical(
186 Canonical::Primitive(list_view.sizes().to_primitive().narrow()?),
187 ctx,
188 Excludes::none(),
189 )?;
190 Ok(ListViewArray::try_new(
191 compressed_elems,
192 compressed_offsets,
193 compressed_sizes,
194 list_view.validity().clone(),
195 )?
196 .into_array())
197 }
198}
199
200impl CanonicalCompressor for BtrBlocksCompressor {
201 fn compress_canonical(
205 &self,
206 array: Canonical,
207 ctx: CompressorContext,
208 excludes: Excludes,
209 ) -> VortexResult<ArrayRef> {
210 match array {
211 Canonical::Null(null_array) => Ok(null_array.into_array()),
212 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
214 Canonical::Primitive(primitive) => {
215 if primitive.ptype().is_int() {
216 self.integer_compressor()
217 .compress(self, &primitive, ctx, excludes.int)
218 } else {
219 self.float_compressor()
220 .compress(self, &primitive, ctx, excludes.float)
221 }
222 }
223 Canonical::Decimal(decimal) => compress_decimal(self, &decimal),
224 Canonical::Struct(struct_array) => {
225 let fields = struct_array
226 .unmasked_fields()
227 .iter()
228 .map(|field| self.compress(field))
229 .collect::<Result<Vec<_>, _>>()?;
230
231 Ok(StructArray::try_new(
232 struct_array.names().clone(),
233 fields,
234 struct_array.len(),
235 struct_array.validity().clone(),
236 )?
237 .into_array())
238 }
239 Canonical::List(list_view_array) => {
240 if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
241 let list_array = list_from_list_view(list_view_array)?;
244 self.compress_list_array(list_array, ctx)
245 } else {
246 self.compress_list_view_array(list_view_array, ctx)
247 }
248 }
249 Canonical::FixedSizeList(fsl_array) => {
250 let compressed_elems = self.compress(fsl_array.elements())?;
251
252 Ok(FixedSizeListArray::try_new(
253 compressed_elems,
254 fsl_array.list_size(),
255 fsl_array.validity().clone(),
256 fsl_array.len(),
257 )?
258 .into_array())
259 }
260 Canonical::VarBinView(strings) => {
261 if strings
262 .dtype()
263 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
264 {
265 self.string_compressor()
266 .compress(self, &strings, ctx, excludes.string)
267 } else {
268 Ok(strings.into_array())
270 }
271 }
272 Canonical::Extension(ext_array) => {
273 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array())
275 && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
276 {
277 if is_constant_opts(
278 temporal_array.as_ref(),
279 &IsConstantOpts {
280 cost: Cost::Canonicalize,
281 },
282 )?
283 .unwrap_or_default()
284 {
285 return Ok(ConstantArray::new(
286 temporal_array.as_ref().scalar_at(0)?,
287 ext_array.len(),
288 )
289 .into_array());
290 }
291 return compress_temporal(self, temporal_array);
292 }
293
294 let compressed_storage = self.compress(ext_array.storage())?;
296
297 Ok(
298 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
299 .into_array(),
300 )
301 }
302 }
303 }
304
305 fn int_schemes(&self) -> &[&'static dyn IntegerScheme] {
306 &self.int_schemes
307 }
308
309 fn float_schemes(&self) -> &[&'static dyn FloatScheme] {
310 &self.float_schemes
311 }
312
313 fn string_schemes(&self) -> &[&'static dyn StringScheme] {
314 &self.string_schemes
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use rstest::rstest;
321 use vortex_array::Array;
322 use vortex_array::IntoArray;
323 use vortex_array::arrays::ListVTable;
324 use vortex_array::arrays::ListViewArray;
325 use vortex_array::arrays::ListViewVTable;
326 use vortex_array::assert_arrays_eq;
327 use vortex_array::validity::Validity;
328 use vortex_buffer::buffer;
329 use vortex_error::VortexResult;
330
331 use crate::BtrBlocksCompressor;
332
333 #[rstest]
334 #[case::zctl(
335 unsafe {
336 ListViewArray::new_unchecked(
337 buffer![1i32, 2, 3, 4, 5].into_array(),
338 buffer![0i32, 3].into_array(),
339 buffer![3i32, 2].into_array(),
340 Validity::NonNullable,
341 ).with_zero_copy_to_list(true)
342 },
343 true,
344 )]
345 #[case::overlapping(
346 ListViewArray::new(
347 buffer![1i32, 2, 3].into_array(),
348 buffer![0i32, 0, 0].into_array(),
349 buffer![3i32, 3, 3].into_array(),
350 Validity::NonNullable,
351 ),
352 false,
353 )]
354 fn listview_compress_roundtrip(
355 #[case] input: ListViewArray,
356 #[case] expect_list: bool,
357 ) -> VortexResult<()> {
358 let result = BtrBlocksCompressor::default().compress(input.as_ref())?;
359 if expect_list {
360 assert!(result.as_opt::<ListVTable>().is_some());
361 } else {
362 assert!(result.as_opt::<ListViewVTable>().is_some());
363 }
364 assert_arrays_eq!(result, input);
365 Ok(())
366 }
367}