vortex_fsst/compute/
mod.rs1mod compare;
2mod filter;
3
4use vortex_array::arrays::{VarBinArray, varbin_scalar};
5use vortex_array::builders::ArrayBuilder;
6use vortex_array::compute::{ScalarAtFn, SliceFn, TakeFn, fill_null, scalar_at, slice, take};
7use vortex_array::vtable::ComputeVTable;
8use vortex_array::{Array, ArrayExt, ArrayRef};
9use vortex_buffer::ByteBuffer;
10use vortex_error::{VortexResult, vortex_err};
11use vortex_scalar::{Scalar, ScalarValue};
12
13use crate::{FSSTArray, FSSTEncoding};
14
15impl ComputeVTable for FSSTEncoding {
16 fn scalar_at_fn(&self) -> Option<&dyn ScalarAtFn<&dyn Array>> {
17 Some(self)
18 }
19
20 fn slice_fn(&self) -> Option<&dyn SliceFn<&dyn Array>> {
21 Some(self)
22 }
23
24 fn take_fn(&self) -> Option<&dyn TakeFn<&dyn Array>> {
25 Some(self)
26 }
27}
28
29impl SliceFn<&FSSTArray> for FSSTEncoding {
30 fn slice(&self, array: &FSSTArray, start: usize, stop: usize) -> VortexResult<ArrayRef> {
31 Ok(FSSTArray::try_new(
34 array.dtype().clone(),
35 array.symbols().clone(),
36 array.symbol_lengths().clone(),
37 slice(array.codes(), start, stop)?
38 .as_::<VarBinArray>()
39 .clone(),
40 slice(array.uncompressed_lengths(), start, stop)?,
41 )?
42 .into_array())
43 }
44}
45
46impl TakeFn<&FSSTArray> for FSSTEncoding {
47 fn take(&self, array: &FSSTArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
49 Ok(FSSTArray::try_new(
50 array.dtype().clone(),
51 array.symbols().clone(),
52 array.symbol_lengths().clone(),
53 take(array.codes(), indices)?.as_::<VarBinArray>().clone(),
54 fill_null(
55 &take(array.uncompressed_lengths(), indices)?,
56 Scalar::new(
57 array.uncompressed_lengths_dtype().clone(),
58 ScalarValue::from(0),
59 ),
60 )?,
61 )?
62 .into_array())
63 }
64
65 fn take_into(
66 &self,
67 array: &FSSTArray,
68 indices: &dyn Array,
69 builder: &mut dyn ArrayBuilder,
70 ) -> VortexResult<()> {
71 builder.extend_from_array(&take(array, indices)?)
72 }
73}
74
75impl ScalarAtFn<&FSSTArray> for FSSTEncoding {
76 fn scalar_at(&self, array: &FSSTArray, index: usize) -> VortexResult<Scalar> {
77 let compressed = scalar_at(array.codes(), index)?;
78 let binary_datum = compressed
79 .as_binary()
80 .value()
81 .ok_or_else(|| vortex_err!("expected null to already be handled"))?;
82
83 let decoded_buffer =
84 ByteBuffer::from(array.decompressor().decompress(binary_datum.as_slice()));
85 Ok(varbin_scalar(decoded_buffer, array.dtype()))
86 }
87}