1use vortex_array::ArrayRef;
7use vortex_array::Canonical;
8use vortex_array::CanonicalValidity;
9use vortex_array::DynArray;
10use vortex_array::IntoArray;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::ToCanonical;
13use vortex_array::VortexSessionExecute;
14use vortex_array::aggregate_fn::fns::is_constant::is_constant;
15use vortex_array::arrays::ConstantArray;
16use vortex_array::arrays::ExtensionArray;
17use vortex_array::arrays::FixedSizeListArray;
18use vortex_array::arrays::ListArray;
19use vortex_array::arrays::ListViewArray;
20use vortex_array::arrays::StructArray;
21use vortex_array::arrays::TemporalArray;
22use vortex_array::arrays::listview::list_from_list_view;
23use vortex_array::dtype::DType;
24use vortex_array::dtype::Nullability;
25use vortex_array::extension::datetime::TemporalMetadata;
26use vortex_array::vtable::ValidityHelper;
27use vortex_error::VortexResult;
28use vortex_error::vortex_bail;
29
30use crate::BtrBlocksCompressorBuilder;
31use crate::CompressorContext;
32use crate::CompressorExt;
33use crate::Excludes;
34use crate::FloatCompressor;
35use crate::IntCode;
36use crate::IntCompressor;
37use crate::StringCompressor;
38use crate::compressor::decimal::compress_decimal;
39use crate::compressor::float::FloatScheme;
40use crate::compressor::integer::IntegerScheme;
41use crate::compressor::string::StringScheme;
42use crate::compressor::temporal::compress_temporal;
43
44pub trait CanonicalCompressor {
49 fn compress_canonical(
51 &self,
52 array: Canonical,
53 ctx: CompressorContext,
54 excludes: Excludes,
55 ) -> VortexResult<ArrayRef>;
56
57 fn int_schemes(&self) -> &[&'static dyn IntegerScheme];
59
60 fn float_schemes(&self) -> &[&'static dyn FloatScheme];
62
63 fn string_schemes(&self) -> &[&'static dyn StringScheme];
65}
66
67#[derive(Clone)]
95pub struct BtrBlocksCompressor {
96 pub int_schemes: Vec<&'static dyn IntegerScheme>,
98
99 pub float_schemes: Vec<&'static dyn FloatScheme>,
101
102 pub string_schemes: Vec<&'static dyn StringScheme>,
104}
105
106impl Default for BtrBlocksCompressor {
107 fn default() -> Self {
108 BtrBlocksCompressorBuilder::default().build()
109 }
110}
111
112impl BtrBlocksCompressor {
113 pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
117 let canonical = array
120 .clone()
121 .execute::<CanonicalValidity>(&mut LEGACY_SESSION.create_execution_ctx())?
122 .0;
123
124 let compact = canonical.compact()?;
126
127 self.compress_canonical(compact, CompressorContext::default(), Excludes::none())
128 }
129
130 pub(crate) fn integer_compressor(&self) -> IntCompressor<'_> {
131 IntCompressor {
132 btr_blocks_compressor: self,
133 }
134 }
135
136 pub(crate) fn float_compressor(&self) -> FloatCompressor<'_> {
137 FloatCompressor {
138 btr_blocks_compressor: self,
139 }
140 }
141
142 pub(crate) fn string_compressor(&self) -> StringCompressor<'_> {
143 StringCompressor {
144 btr_blocks_compressor: self,
145 }
146 }
147
148 fn compress_list_array(
150 &self,
151 list_array: ListArray,
152 ctx: CompressorContext,
153 ) -> VortexResult<ArrayRef> {
154 let list_array = list_array.reset_offsets(true)?;
158
159 let compressed_elems = self.compress(list_array.elements())?;
160
161 let compressed_offsets = self.compress_canonical(
165 Canonical::Primitive(list_array.offsets().to_primitive().narrow()?),
166 ctx,
167 Excludes::from(&[IntCode::Dict]),
168 )?;
169
170 Ok(ListArray::try_new(
171 compressed_elems,
172 compressed_offsets,
173 list_array.validity().clone(),
174 )?
175 .into_array())
176 }
177
178 fn compress_list_view_array(
181 &self,
182 list_view: ListViewArray,
183 ctx: CompressorContext,
184 ) -> VortexResult<ArrayRef> {
185 let compressed_elems = self.compress(list_view.elements())?;
186 let compressed_offsets = self.compress_canonical(
187 Canonical::Primitive(list_view.offsets().to_primitive().narrow()?),
188 ctx,
189 Excludes::none(),
190 )?;
191 let compressed_sizes = self.compress_canonical(
192 Canonical::Primitive(list_view.sizes().to_primitive().narrow()?),
193 ctx,
194 Excludes::none(),
195 )?;
196 Ok(ListViewArray::try_new(
197 compressed_elems,
198 compressed_offsets,
199 compressed_sizes,
200 list_view.validity().clone(),
201 )?
202 .into_array())
203 }
204}
205
206impl CanonicalCompressor for BtrBlocksCompressor {
207 fn compress_canonical(
211 &self,
212 array: Canonical,
213 ctx: CompressorContext,
214 excludes: Excludes,
215 ) -> VortexResult<ArrayRef> {
216 match array {
217 Canonical::Null(null_array) => Ok(null_array.into_array()),
218 Canonical::Bool(bool_array) => Ok(bool_array.into_array()),
219 Canonical::Primitive(primitive) => {
220 if primitive.ptype().is_int() {
221 self.integer_compressor()
222 .compress(self, &primitive, ctx, excludes.int)
223 } else {
224 self.float_compressor()
225 .compress(self, &primitive, ctx, excludes.float)
226 }
227 }
228 Canonical::Decimal(decimal) => compress_decimal(self, &decimal),
229 Canonical::Struct(struct_array) => {
230 let fields = struct_array
231 .unmasked_fields()
232 .iter()
233 .map(|field| self.compress(field))
234 .collect::<Result<Vec<_>, _>>()?;
235
236 Ok(StructArray::try_new(
237 struct_array.names().clone(),
238 fields,
239 struct_array.len(),
240 struct_array.validity().clone(),
241 )?
242 .into_array())
243 }
244 Canonical::List(list_view_array) => {
245 if list_view_array.is_zero_copy_to_list() || list_view_array.elements().is_empty() {
246 let list_array = list_from_list_view(list_view_array)?;
249 self.compress_list_array(list_array, ctx)
250 } else {
251 self.compress_list_view_array(list_view_array, ctx)
252 }
253 }
254 Canonical::FixedSizeList(fsl_array) => {
255 let compressed_elems = self.compress(fsl_array.elements())?;
256
257 Ok(FixedSizeListArray::try_new(
258 compressed_elems,
259 fsl_array.list_size(),
260 fsl_array.validity().clone(),
261 fsl_array.len(),
262 )?
263 .into_array())
264 }
265 Canonical::VarBinView(strings) => {
266 if strings
267 .dtype()
268 .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable))
269 {
270 self.string_compressor()
271 .compress(self, &strings, ctx, excludes.string)
272 } else {
273 Ok(strings.into_array())
275 }
276 }
277 Canonical::Extension(ext_array) => {
278 if let Ok(temporal_array) = TemporalArray::try_from(ext_array.clone().into_array())
280 && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata()
281 {
282 let mut ctx = LEGACY_SESSION.create_execution_ctx();
283 if is_constant(&ext_array.clone().into_array(), &mut ctx)? {
284 return Ok(ConstantArray::new(
285 temporal_array.as_ref().scalar_at(0)?,
286 ext_array.len(),
287 )
288 .into_array());
289 }
290 return compress_temporal(self, temporal_array);
291 }
292
293 let compressed_storage = self.compress(ext_array.storage_array())?;
295
296 Ok(
297 ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
298 .into_array(),
299 )
300 }
301 Canonical::Variant(_) => {
302 vortex_bail!("Variant arrays can not be compressed")
303 }
304 }
305 }
306
307 fn int_schemes(&self) -> &[&'static dyn IntegerScheme] {
308 &self.int_schemes
309 }
310
311 fn float_schemes(&self) -> &[&'static dyn FloatScheme] {
312 &self.float_schemes
313 }
314
315 fn string_schemes(&self) -> &[&'static dyn StringScheme] {
316 &self.string_schemes
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use rstest::rstest;
323 use vortex_array::DynArray;
324 use vortex_array::IntoArray;
325 use vortex_array::arrays::List;
326 use vortex_array::arrays::ListView;
327 use vortex_array::arrays::ListViewArray;
328 use vortex_array::assert_arrays_eq;
329 use vortex_array::validity::Validity;
330 use vortex_buffer::buffer;
331 use vortex_error::VortexResult;
332
333 use crate::BtrBlocksCompressor;
334
335 #[rstest]
336 #[case::zctl(
337 unsafe {
338 ListViewArray::new_unchecked(
339 buffer![1i32, 2, 3, 4, 5].into_array(),
340 buffer![0i32, 3].into_array(),
341 buffer![3i32, 2].into_array(),
342 Validity::NonNullable,
343 ).with_zero_copy_to_list(true)
344 },
345 true,
346 )]
347 #[case::overlapping(
348 ListViewArray::new(
349 buffer![1i32, 2, 3].into_array(),
350 buffer![0i32, 0, 0].into_array(),
351 buffer![3i32, 3, 3].into_array(),
352 Validity::NonNullable,
353 ),
354 false,
355 )]
356 fn listview_compress_roundtrip(
357 #[case] input: ListViewArray,
358 #[case] expect_list: bool,
359 ) -> VortexResult<()> {
360 let array_ref = input.clone().into_array();
361 let result = BtrBlocksCompressor::default().compress(&array_ref)?;
362 if expect_list {
363 assert!(result.as_opt::<List>().is_some());
364 } else {
365 assert!(result.as_opt::<ListView>().is_some());
366 }
367 assert_arrays_eq!(result, input);
368 Ok(())
369 }
370}