vortex_fsst/compute/
mod.rs1mod cast;
5mod compare;
6mod filter;
7mod like;
8
9use vortex_array::ArrayRef;
10use vortex_array::DynArray;
11use vortex_array::ExecutionCtx;
12use vortex_array::IntoArray;
13use vortex_array::arrays::VarBin;
14use vortex_array::arrays::dict::TakeExecute;
15use vortex_array::builtins::ArrayBuiltins;
16use vortex_array::scalar::Scalar;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_err;
20
21use crate::FSST;
22use crate::FSSTArray;
23
24impl TakeExecute for FSST {
25 fn take(
26 array: &FSSTArray,
27 indices: &ArrayRef,
28 _ctx: &mut ExecutionCtx,
29 ) -> VortexResult<Option<ArrayRef>> {
30 Ok(Some(
31 FSSTArray::try_new(
32 array
33 .dtype()
34 .clone()
35 .union_nullability(indices.dtype().nullability()),
36 array.symbols().clone(),
37 array.symbol_lengths().clone(),
38 VarBin::take(array.codes(), indices, _ctx)?
39 .vortex_expect("cannot fail")
40 .try_into::<VarBin>()
41 .map_err(|_| vortex_err!("take for codes must return varbin array"))?,
42 array
43 .uncompressed_lengths()
44 .take(indices.to_array())?
45 .fill_null(Scalar::zero_value(
46 &array.uncompressed_lengths_dtype().clone(),
47 ))?,
48 )?
49 .into_array(),
50 ))
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use rstest::rstest;
57 use vortex_array::DynArray;
58 use vortex_array::IntoArray;
59 use vortex_array::arrays::PrimitiveArray;
60 use vortex_array::arrays::VarBinArray;
61 use vortex_array::compute::conformance::consistency::test_array_consistency;
62 use vortex_array::compute::conformance::take::test_take_conformance;
63 use vortex_array::dtype::DType;
64 use vortex_array::dtype::Nullability;
65
66 use crate::FSSTArray;
67 use crate::fsst_compress;
68 use crate::fsst_train_compressor;
69
70 #[test]
71 fn test_take_null() {
72 let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
73 let compr = fsst_train_compressor(&arr);
74 let fsst = fsst_compress(&arr, &compr);
75
76 let idx1: PrimitiveArray = (0..1).collect();
77
78 assert_eq!(
79 fsst.take(idx1.into_array()).unwrap().dtype(),
80 &DType::Utf8(Nullability::NonNullable)
81 );
82
83 let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
84
85 assert_eq!(
86 fsst.take(idx2.into_array()).unwrap().dtype(),
87 &DType::Utf8(Nullability::Nullable)
88 );
89 }
90
91 #[rstest]
92 #[case(VarBinArray::from_iter(
93 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
94 DType::Utf8(Nullability::NonNullable),
95 ))]
96 #[case(VarBinArray::from_iter(
97 [Some("hello"), None, Some("world"), Some("test"), None],
98 DType::Utf8(Nullability::Nullable),
99 ))]
100 #[case(VarBinArray::from_iter(
101 ["single element"].map(Some),
102 DType::Utf8(Nullability::NonNullable),
103 ))]
104 fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
105 let compressor = fsst_train_compressor(&varbin);
106 let array = fsst_compress(&varbin, &compressor);
107 test_take_conformance(&array.into_array());
108 }
109
110 #[rstest]
111 #[case::fsst_simple({
113 let varbin = VarBinArray::from_iter(
114 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
115 DType::Utf8(Nullability::NonNullable),
116 );
117 let compressor = fsst_train_compressor(&varbin);
118 fsst_compress(&varbin, &compressor)
119 })]
120 #[case::fsst_nullable({
122 let varbin = VarBinArray::from_iter(
123 [Some("hello"), None, Some("world"), Some("test"), None],
124 DType::Utf8(Nullability::Nullable),
125 );
126 let compressor = fsst_train_compressor(&varbin);
127 fsst_compress(varbin, &compressor)
128 })]
129 #[case::fsst_repetitive({
131 let varbin = VarBinArray::from_iter(
132 ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
133 DType::Utf8(Nullability::NonNullable),
134 );
135 let compressor = fsst_train_compressor(&varbin);
136 fsst_compress(&varbin, &compressor)
137 })]
138 #[case::fsst_single({
140 let varbin = VarBinArray::from_iter(
141 ["single element"].map(Some),
142 DType::Utf8(Nullability::NonNullable),
143 );
144 let compressor = fsst_train_compressor(&varbin);
145 fsst_compress(&varbin, &compressor)
146 })]
147 #[case::fsst_empty_strings({
148 let varbin = VarBinArray::from_iter(
149 ["", "test", "", "hello", ""].map(Some),
150 DType::Utf8(Nullability::NonNullable),
151 );
152 let compressor = fsst_train_compressor(&varbin);
153 fsst_compress(varbin, &compressor)
154 })]
155 #[case::fsst_large({
157 let data: Vec<Option<&str>> = (0..1500)
158 .map(|i| Some(match i % 10 {
159 0 => "https://www.example.com/page",
160 1 => "https://www.test.org/data",
161 2 => "https://www.vortex.dev/docs",
162 3 => "https://www.github.com/apache/arrow",
163 4 => "https://www.rust-lang.org/learn",
164 5 => "SELECT * FROM table WHERE id = ",
165 6 => "INSERT INTO users (name, email) VALUES",
166 7 => "UPDATE records SET status = 'active'",
167 8 => "DELETE FROM logs WHERE timestamp < ",
168 _ => "CREATE TABLE data (id INT, value TEXT)",
169 }))
170 .collect();
171 let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
172 let compressor = fsst_train_compressor(&varbin);
173 fsst_compress(varbin, &compressor)
174 })]
175
176 fn test_fsst_consistency(#[case] array: FSSTArray) {
177 test_array_consistency(&array.into_array());
178 }
179}