vortex_fsst/compute/
mod.rs1mod compare;
2
3use vortex_array::arrays::{VarBinArray, varbin_scalar};
4use vortex_array::builders::ArrayBuilder;
5use vortex_array::compute::{
6 CompareFn, FilterKernel, FilterKernelAdapter, KernelRef, ScalarAtFn, SliceFn, TakeFn,
7 fill_null, filter, scalar_at, slice, take,
8};
9use vortex_array::vtable::ComputeVTable;
10use vortex_array::{Array, ArrayComputeImpl, ArrayExt, ArrayRef};
11use vortex_buffer::ByteBuffer;
12use vortex_error::{VortexResult, vortex_err};
13use vortex_mask::Mask;
14use vortex_scalar::{Scalar, ScalarValue};
15
16use crate::{FSSTArray, FSSTEncoding};
17
18impl ArrayComputeImpl for FSSTArray {
19 const FILTER: Option<KernelRef> = FilterKernelAdapter(FSSTEncoding).some();
20}
21impl ComputeVTable for FSSTEncoding {
22 fn compare_fn(&self) -> Option<&dyn CompareFn<&dyn Array>> {
23 Some(self)
24 }
25
26 fn scalar_at_fn(&self) -> Option<&dyn ScalarAtFn<&dyn Array>> {
27 Some(self)
28 }
29
30 fn slice_fn(&self) -> Option<&dyn SliceFn<&dyn Array>> {
31 Some(self)
32 }
33
34 fn take_fn(&self) -> Option<&dyn TakeFn<&dyn Array>> {
35 Some(self)
36 }
37}
38
39impl SliceFn<&FSSTArray> for FSSTEncoding {
40 fn slice(&self, array: &FSSTArray, start: usize, stop: usize) -> VortexResult<ArrayRef> {
41 Ok(FSSTArray::try_new(
44 array.dtype().clone(),
45 array.symbols().clone(),
46 array.symbol_lengths().clone(),
47 slice(array.codes(), start, stop)?
48 .as_::<VarBinArray>()
49 .clone(),
50 slice(array.uncompressed_lengths(), start, stop)?,
51 )?
52 .into_array())
53 }
54}
55
56impl TakeFn<&FSSTArray> for FSSTEncoding {
57 fn take(&self, array: &FSSTArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
59 Ok(FSSTArray::try_new(
60 array.dtype().clone(),
61 array.symbols().clone(),
62 array.symbol_lengths().clone(),
63 take(array.codes(), indices)?.as_::<VarBinArray>().clone(),
64 fill_null(
65 &take(array.uncompressed_lengths(), indices)?,
66 Scalar::new(
67 array.uncompressed_lengths_dtype().clone(),
68 ScalarValue::from(0),
69 ),
70 )?,
71 )?
72 .into_array())
73 }
74
75 fn take_into(
76 &self,
77 array: &FSSTArray,
78 indices: &dyn Array,
79 builder: &mut dyn ArrayBuilder,
80 ) -> VortexResult<()> {
81 builder.extend_from_array(&take(array, indices)?)
82 }
83}
84
85impl ScalarAtFn<&FSSTArray> for FSSTEncoding {
86 fn scalar_at(&self, array: &FSSTArray, index: usize) -> VortexResult<Scalar> {
87 let compressed = scalar_at(array.codes(), index)?;
88 let binary_datum = compressed
89 .as_binary()
90 .value()
91 .ok_or_else(|| vortex_err!("expected null to already be handled"))?;
92
93 let decoded_buffer =
94 ByteBuffer::from(array.decompressor().decompress(binary_datum.as_slice()));
95 Ok(varbin_scalar(decoded_buffer, array.dtype()))
96 }
97}
98
99impl FilterKernel for FSSTEncoding {
100 fn filter(&self, array: &FSSTArray, mask: &Mask) -> VortexResult<ArrayRef> {
102 Ok(FSSTArray::try_new(
103 array.dtype().clone(),
104 array.symbols().clone(),
105 array.symbol_lengths().clone(),
106 filter(array.codes(), mask)?.as_::<VarBinArray>().clone(),
107 filter(array.uncompressed_lengths(), mask)?,
108 )?
109 .into_array())
110 }
111}