vortex_fsst/compute/
mod.rs1mod cast;
5mod compare;
6mod filter;
7
8use vortex_array::Array;
9use vortex_array::ArrayRef;
10use vortex_array::IntoArray;
11use vortex_array::arrays::VarBinVTable;
12use vortex_array::compute::TakeKernel;
13use vortex_array::compute::TakeKernelAdapter;
14use vortex_array::compute::fill_null;
15use vortex_array::compute::take;
16use vortex_array::register_kernel;
17use vortex_error::VortexResult;
18use vortex_scalar::Scalar;
19use vortex_scalar::ScalarValue;
20
21use crate::FSSTArray;
22use crate::FSSTVTable;
23
24impl TakeKernel for FSSTVTable {
25 fn take(&self, array: &FSSTArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
27 Ok(FSSTArray::try_new(
28 array
29 .dtype()
30 .clone()
31 .union_nullability(indices.dtype().nullability()),
32 array.symbols().clone(),
33 array.symbol_lengths().clone(),
34 take(array.codes().as_ref(), indices)?
35 .as_::<VarBinVTable>()
36 .clone(),
37 fill_null(
38 &take(array.uncompressed_lengths(), indices)?,
39 &Scalar::new(
40 array.uncompressed_lengths_dtype().clone(),
41 ScalarValue::from(0),
42 ),
43 )?,
44 )?
45 .into_array())
46 }
47}
48
49register_kernel!(TakeKernelAdapter(FSSTVTable).lift());
50
51#[cfg(test)]
52mod tests {
53 use rstest::rstest;
54 use vortex_array::arrays::PrimitiveArray;
55 use vortex_array::arrays::VarBinArray;
56 use vortex_array::compute::conformance::consistency::test_array_consistency;
57 use vortex_array::compute::conformance::take::test_take_conformance;
58 use vortex_array::compute::take;
59 use vortex_dtype::DType;
60 use vortex_dtype::Nullability;
61
62 use crate::FSSTArray;
63 use crate::fsst_compress;
64 use crate::fsst_train_compressor;
65
66 #[test]
67 fn test_take_null() {
68 let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
69 let compr = fsst_train_compressor(&arr);
70 let fsst = fsst_compress(&arr, &compr);
71
72 let idx1: PrimitiveArray = (0..1).collect();
73
74 assert_eq!(
75 take(fsst.as_ref(), idx1.as_ref()).unwrap().dtype(),
76 &DType::Utf8(Nullability::NonNullable)
77 );
78
79 let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
80
81 assert_eq!(
82 take(fsst.as_ref(), idx2.as_ref()).unwrap().dtype(),
83 &DType::Utf8(Nullability::Nullable)
84 );
85 }
86
87 #[rstest]
88 #[case(VarBinArray::from_iter(
89 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
90 DType::Utf8(Nullability::NonNullable),
91 ))]
92 #[case(VarBinArray::from_iter(
93 [Some("hello"), None, Some("world"), Some("test"), None],
94 DType::Utf8(Nullability::Nullable),
95 ))]
96 #[case(VarBinArray::from_iter(
97 ["single element"].map(Some),
98 DType::Utf8(Nullability::NonNullable),
99 ))]
100 fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
101 let compressor = fsst_train_compressor(&varbin);
102 let array = fsst_compress(&varbin, &compressor);
103 test_take_conformance(array.as_ref());
104 }
105
106 #[rstest]
107 #[case::fsst_simple({
109 let varbin = VarBinArray::from_iter(
110 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
111 DType::Utf8(Nullability::NonNullable),
112 );
113 let compressor = fsst_train_compressor(&varbin);
114 fsst_compress(&varbin, &compressor)
115 })]
116 #[case::fsst_nullable({
118 let varbin = VarBinArray::from_iter(
119 [Some("hello"), None, Some("world"), Some("test"), None],
120 DType::Utf8(Nullability::Nullable),
121 );
122 let compressor = fsst_train_compressor(&varbin);
123 fsst_compress(varbin, &compressor)
124 })]
125 #[case::fsst_repetitive({
127 let varbin = VarBinArray::from_iter(
128 ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
129 DType::Utf8(Nullability::NonNullable),
130 );
131 let compressor = fsst_train_compressor(&varbin);
132 fsst_compress(&varbin, &compressor)
133 })]
134 #[case::fsst_single({
136 let varbin = VarBinArray::from_iter(
137 ["single element"].map(Some),
138 DType::Utf8(Nullability::NonNullable),
139 );
140 let compressor = fsst_train_compressor(&varbin);
141 fsst_compress(&varbin, &compressor)
142 })]
143 #[case::fsst_empty_strings({
144 let varbin = VarBinArray::from_iter(
145 ["", "test", "", "hello", ""].map(Some),
146 DType::Utf8(Nullability::NonNullable),
147 );
148 let compressor = fsst_train_compressor(&varbin);
149 fsst_compress(varbin, &compressor)
150 })]
151 #[case::fsst_large({
153 let data: Vec<Option<&str>> = (0..1500)
154 .map(|i| Some(match i % 10 {
155 0 => "https://www.example.com/page",
156 1 => "https://www.test.org/data",
157 2 => "https://www.vortex.dev/docs",
158 3 => "https://www.github.com/apache/arrow",
159 4 => "https://www.rust-lang.org/learn",
160 5 => "SELECT * FROM table WHERE id = ",
161 6 => "INSERT INTO users (name, email) VALUES",
162 7 => "UPDATE records SET status = 'active'",
163 8 => "DELETE FROM logs WHERE timestamp < ",
164 _ => "CREATE TABLE data (id INT, value TEXT)",
165 }))
166 .collect();
167 let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
168 let compressor = fsst_train_compressor(&varbin);
169 fsst_compress(varbin, &compressor)
170 })]
171
172 fn test_fsst_consistency(#[case] array: FSSTArray) {
173 test_array_consistency(array.as_ref());
174 }
175}