vortex_fsst/compute/
mod.rs

1mod compare;
2
3use vortex_array::arrays::{VarBinArray, varbin_scalar};
4use vortex_array::builders::ArrayBuilder;
5use vortex_array::compute::{
6    CompareFn, FilterKernel, FilterKernelAdapter, KernelRef, ScalarAtFn, SliceFn, TakeFn,
7    fill_null, filter, scalar_at, slice, take,
8};
9use vortex_array::vtable::ComputeVTable;
10use vortex_array::{Array, ArrayComputeImpl, ArrayExt, ArrayRef};
11use vortex_buffer::ByteBuffer;
12use vortex_error::{VortexResult, vortex_err};
13use vortex_mask::Mask;
14use vortex_scalar::{Scalar, ScalarValue};
15
16use crate::{FSSTArray, FSSTEncoding};
17
18impl ArrayComputeImpl for FSSTArray {
19    const FILTER: Option<KernelRef> = FilterKernelAdapter(FSSTEncoding).some();
20}
21impl ComputeVTable for FSSTEncoding {
22    fn compare_fn(&self) -> Option<&dyn CompareFn<&dyn Array>> {
23        Some(self)
24    }
25
26    fn scalar_at_fn(&self) -> Option<&dyn ScalarAtFn<&dyn Array>> {
27        Some(self)
28    }
29
30    fn slice_fn(&self) -> Option<&dyn SliceFn<&dyn Array>> {
31        Some(self)
32    }
33
34    fn take_fn(&self) -> Option<&dyn TakeFn<&dyn Array>> {
35        Some(self)
36    }
37}
38
39impl SliceFn<&FSSTArray> for FSSTEncoding {
40    fn slice(&self, array: &FSSTArray, start: usize, stop: usize) -> VortexResult<ArrayRef> {
41        // Slicing an FSST array leaves the symbol table unmodified,
42        // only slicing the `codes` array.
43        Ok(FSSTArray::try_new(
44            array.dtype().clone(),
45            array.symbols().clone(),
46            array.symbol_lengths().clone(),
47            slice(array.codes(), start, stop)?
48                .as_::<VarBinArray>()
49                .clone(),
50            slice(array.uncompressed_lengths(), start, stop)?,
51        )?
52        .into_array())
53    }
54}
55
56impl TakeFn<&FSSTArray> for FSSTEncoding {
57    // Take on an FSSTArray is a simple take on the codes array.
58    fn take(&self, array: &FSSTArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
59        Ok(FSSTArray::try_new(
60            array.dtype().clone(),
61            array.symbols().clone(),
62            array.symbol_lengths().clone(),
63            take(array.codes(), indices)?.as_::<VarBinArray>().clone(),
64            fill_null(
65                &take(array.uncompressed_lengths(), indices)?,
66                Scalar::new(
67                    array.uncompressed_lengths_dtype().clone(),
68                    ScalarValue::from(0),
69                ),
70            )?,
71        )?
72        .into_array())
73    }
74
75    fn take_into(
76        &self,
77        array: &FSSTArray,
78        indices: &dyn Array,
79        builder: &mut dyn ArrayBuilder,
80    ) -> VortexResult<()> {
81        builder.extend_from_array(&take(array, indices)?)
82    }
83}
84
85impl ScalarAtFn<&FSSTArray> for FSSTEncoding {
86    fn scalar_at(&self, array: &FSSTArray, index: usize) -> VortexResult<Scalar> {
87        let compressed = scalar_at(array.codes(), index)?;
88        let binary_datum = compressed
89            .as_binary()
90            .value()
91            .ok_or_else(|| vortex_err!("expected null to already be handled"))?;
92
93        let decoded_buffer =
94            ByteBuffer::from(array.decompressor().decompress(binary_datum.as_slice()));
95        Ok(varbin_scalar(decoded_buffer, array.dtype()))
96    }
97}
98
99impl FilterKernel for FSSTEncoding {
100    // Filtering an FSSTArray filters the codes array, leaving the symbols array untouched
101    fn filter(&self, array: &FSSTArray, mask: &Mask) -> VortexResult<ArrayRef> {
102        Ok(FSSTArray::try_new(
103            array.dtype().clone(),
104            array.symbols().clone(),
105            array.symbol_lengths().clone(),
106            filter(array.codes(), mask)?.as_::<VarBinArray>().clone(),
107            filter(array.uncompressed_lengths(), mask)?,
108        )?
109        .into_array())
110    }
111}