1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::CanonicalValidity;
9use vortex_array::DynArray;
10use vortex_array::IntoArray;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::ToCanonical;
13use vortex_array::VortexSessionExecute;
14use vortex_array::arrays::ConstantArray;
15use vortex_array::arrays::ExtensionArray;
16use vortex_array::arrays::FixedSizeListArray;
17use vortex_array::arrays::ListArray;
18use vortex_array::arrays::ListViewArray;
19use vortex_array::arrays::StructArray;
20use vortex_array::arrays::TemporalArray;
21use vortex_array::arrays::listview::list_from_list_view;
22use vortex_array::compute::Cost;
23use vortex_array::compute::IsConstantOpts;
24use vortex_array::compute::is_constant_opts;
25use vortex_array::dtype::DType;
26use vortex_array::dtype::Nullability;
27use vortex_array::extension::datetime::TemporalMetadata;
28use vortex_array::vtable::ValidityHelper;
29use vortex_error::VortexResult;
30
31use crate::BtrBlocksCompressorBuilder;
32use crate::CompressorContext;
33use crate::CompressorExt;
34use crate::Excludes;
35use crate::FloatCompressor;
36use crate::IntCode;
37use crate::IntCompressor;
38use crate::StringCompressor;
39use crate::compressor::decimal::compress_decimal;
40use crate::compressor::float::FloatScheme;
41use crate::compressor::integer::IntegerScheme;
42use crate::compressor::string::StringScheme;
43use crate::compressor::temporal::compress_temporal;
44
45pub trait CanonicalCompressor {
50 fn compress_canonical(
52 &self,
53 array: Canonical,
54 ctx: CompressorContext,
55 excludes: Excludes,
56 ) -> VortexResult<ArrayRef>;
57
58 fn int_schemes(&self) -> &[&'static dyn IntegerScheme];
60
61 fn float_schemes(&self) -> &[&'static dyn FloatScheme];
63
64 fn string_schemes(&self) -> &[&'static dyn StringScheme];
66}
67
68#[derive(Clone)]
96pub struct BtrBlocksCompressor {
97 pub int_schemes: Vec<&'static dyn IntegerScheme>,
99
100 pub float_schemes: Vec<&'static dyn FloatScheme>,
102
103 pub string_schemes: Vec<&'static dyn StringScheme>,
105}
106
107impl Default for BtrBlocksCompressor {
108 fn default() -> Self {
109 BtrBlocksCompressorBuilder::default().build()
110 }
111}
112
113impl BtrBlocksCompressor {
114 pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
118 let canonical = array
121 .clone()
122 .execute::<CanonicalValidity>(&mut LEGACY_SESSION.create_execution_ctx())?
123 .0;
124
125 let compact = canonical.compact()?;
127
128 self.compress_canonical(compact, CompressorContext::default(), Excludes::none())
129 }
130
131 pub(crate) fn integer_compressor(&self) -> IntCompressor<'_> {
132 IntCompressor {
133 btr_blocks_compressor: self,
134 }
135 }
136
137 pub(crate) fn float_compressor(&self) -> FloatCompressor<'_> {
138 FloatCompressor {
139 btr_blocks_compressor: self,
140 }
141 }
142
143 pub(crate) fn string_compressor(&self) -> StringCompressor<'_> {
144 StringCompressor {
145 btr_blocks_compressor: self,
146 }
147 }
148
149 fn compress_list_array(
151 &self,
152 list_array: ListArray,
153 ctx: CompressorContext,
154 ) -> VortexResult<ArrayRef> {
155 let list_array = list_array.reset_offsets(true)?;
159
160 let compressed_elems = self.compress(list_array.elements())?;
161
162 let compressed_offsets = self.compress_canonical(
166 Canonical::Primitive(list_array.offsets().to_primitive().narrow()?),
167 ctx,
168 Excludes::from(&[IntCode::Dict]),
169 )?;
170
171 Ok(ListArray::try_new(
172 compressed_elems,
173 compressed_offsets,
174 list_array.validity().clone(),
175 )?
176 .into_array())
177 }
178
179 fn compress_list_view_array(
182 &self,
183 list_view: ListViewArray,
184 ctx: CompressorContext,
185 ) -> VortexResult<ArrayRef> {
186 let compressed_elems = self.compress(list_view.elements())?;
187 let compressed_offsets = self.compress_canonical(
188 Canonical::Primitive(list_view.offsets().to_primitive().narrow()?),
189 ctx,
190 Excludes::none(),
191 )?;
192 let compressed_sizes = self.compress_canonical(
193 Canonical::Primitive(list_view.sizes().to_primitive().narrow()?),
194 ctx,
195 Excludes::none(),
196 )?;
197 Ok(ListViewArray::try_new(
198 compressed_elems,
199 compressed_offsets,
200 compressed_sizes,
201 list_view.validity().clone(),
202 )?
203 .into_array())
204 }
205}
206
207impl CanonicalCompressor for BtrBlocksCompressor {
208 fn compress_canonical(
212 &self,
213 array: Canonical,
214 ctx: CompressorContext,
215 excludes: Excludes,
216 ) -> VortexResult<ArrayRef> {
217 match array {
218 Canonical::Null(null_array) => Ok(null_array.into_array()),
219 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
221 Canonical::Primitive(primitive) => {
222 if primitive.ptype().is_int() {
223 self.integer_compressor()
224 .compress(self, &primitive, ctx, excludes.int)
225 } else {
226 self.float_compressor()
227 .compress(self, &primitive, ctx, excludes.float)
228 }
229 }
230 Canonical::Decimal(decimal) => compress_decimal(self, &decimal),
231 Canonical::Struct(struct_array) => {
232 let fields = struct_array
233 .unmasked_fields()
234 .iter()
235 .map(|field| self.compress(field))
236 .collect::<Result<Vec<_>, _>>()?;
237
238 Ok(StructArray::try_new(
239 struct_array.names().clone(),
240 fields,
241 struct_array.len(),
242 struct_array.validity().clone(),
243 )?
244 .into_array())
245 }
246 Canonical::List(list_view_array) => {
247 if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
248 let list_array = list_from_list_view(list_view_array)?;
251 self.compress_list_array(list_array, ctx)
252 } else {
253 self.compress_list_view_array(list_view_array, ctx)
254 }
255 }
256 Canonical::FixedSizeList(fsl_array) => {
257 let compressed_elems = self.compress(fsl_array.elements())?;
258
259 Ok(FixedSizeListArray::try_new(
260 compressed_elems,
261 fsl_array.list_size(),
262 fsl_array.validity().clone(),
263 fsl_array.len(),
264 )?
265 .into_array())
266 }
267 Canonical::VarBinView(strings) => {
268 if strings
269 .dtype()
270 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
271 {
272 self.string_compressor()
273 .compress(self, &strings, ctx, excludes.string)
274 } else {
275 Ok(strings.into_array())
277 }
278 }
279 Canonical::Extension(ext_array) => {
280 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.clone().into_array())
282 && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
283 {
284 if is_constant_opts(
285 &ext_array.clone().into_array(),
286 &IsConstantOpts {
287 cost: Cost::Canonicalize,
288 },
289 )?
290 .unwrap_or_default()
291 {
292 return Ok(ConstantArray::new(
293 temporal_array.as_ref().scalar_at(0)?,
294 ext_array.len(),
295 )
296 .into_array());
297 }
298 return compress_temporal(self, temporal_array);
299 }
300
301 let compressed_storage = self.compress(ext_array.storage_array())?;
303
304 Ok(
305 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
306 .into_array(),
307 )
308 }
309 }
310 }
311
312 fn int_schemes(&self) -> &[&'static dyn IntegerScheme] {
313 &self.int_schemes
314 }
315
316 fn float_schemes(&self) -> &[&'static dyn FloatScheme] {
317 &self.float_schemes
318 }
319
320 fn string_schemes(&self) -> &[&'static dyn StringScheme] {
321 &self.string_schemes
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use rstest::rstest;
328 use vortex_array::DynArray;
329 use vortex_array::IntoArray;
330 use vortex_array::arrays::ListVTable;
331 use vortex_array::arrays::ListViewArray;
332 use vortex_array::arrays::ListViewVTable;
333 use vortex_array::assert_arrays_eq;
334 use vortex_array::validity::Validity;
335 use vortex_buffer::buffer;
336 use vortex_error::VortexResult;
337
338 use crate::BtrBlocksCompressor;
339
340 #[rstest]
341 #[case::zctl(
342 unsafe {
343 ListViewArray::new_unchecked(
344 buffer![1i32, 2, 3, 4, 5].into_array(),
345 buffer![0i32, 3].into_array(),
346 buffer![3i32, 2].into_array(),
347 Validity::NonNullable,
348 ).with_zero_copy_to_list(true)
349 },
350 true,
351 )]
352 #[case::overlapping(
353 ListViewArray::new(
354 buffer![1i32, 2, 3].into_array(),
355 buffer![0i32, 0, 0].into_array(),
356 buffer![3i32, 3, 3].into_array(),
357 Validity::NonNullable,
358 ),
359 false,
360 )]
361 fn listview_compress_roundtrip(
362 #[case] input: ListViewArray,
363 #[case] expect_list: bool,
364 ) -> VortexResult<()> {
365 let array_ref = input.clone().into_array();
366 let result = BtrBlocksCompressor::default().compress(&array_ref)?;
367 if expect_list {
368 assert!(result.as_opt::<ListVTable>().is_some());
369 } else {
370 assert!(result.as_opt::<ListViewVTable>().is_some());
371 }
372 assert_arrays_eq!(result, input);
373 Ok(())
374 }
375}