vortex_fsst/compute/
mod.rs1mod cast;
5mod compare;
6mod filter;
7
8use vortex_array::Array;
9use vortex_array::ArrayRef;
10use vortex_array::ExecutionCtx;
11use vortex_array::IntoArray;
12use vortex_array::arrays::TakeExecute;
13use vortex_array::arrays::VarBinVTable;
14use vortex_array::builtins::ArrayBuiltins;
15use vortex_array::scalar::Scalar;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_err;
19
20use crate::FSSTArray;
21use crate::FSSTVTable;
22
23impl TakeExecute for FSSTVTable {
24 fn take(
25 array: &FSSTArray,
26 indices: &dyn Array,
27 _ctx: &mut ExecutionCtx,
28 ) -> VortexResult<Option<ArrayRef>> {
29 Ok(Some(
30 FSSTArray::try_new(
31 array
32 .dtype()
33 .clone()
34 .union_nullability(indices.dtype().nullability()),
35 array.symbols().clone(),
36 array.symbol_lengths().clone(),
37 VarBinVTable::take(array.codes(), indices, _ctx)?
38 .vortex_expect("cannot fail")
39 .try_into::<VarBinVTable>()
40 .map_err(|_| vortex_err!("take for codes must return varbin array"))?,
41 array
42 .uncompressed_lengths()
43 .take(indices.to_array())?
44 .fill_null(Scalar::zero_value(
45 &array.uncompressed_lengths_dtype().clone(),
46 ))?,
47 )?
48 .into_array(),
49 ))
50 }
51}
52
53#[cfg(test)]
54mod tests {
55 use rstest::rstest;
56 use vortex_array::Array;
57 use vortex_array::arrays::PrimitiveArray;
58 use vortex_array::arrays::VarBinArray;
59 use vortex_array::compute::conformance::consistency::test_array_consistency;
60 use vortex_array::compute::conformance::take::test_take_conformance;
61 use vortex_array::dtype::DType;
62 use vortex_array::dtype::Nullability;
63
64 use crate::FSSTArray;
65 use crate::fsst_compress;
66 use crate::fsst_train_compressor;
67
68 #[test]
69 fn test_take_null() {
70 let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
71 let compr = fsst_train_compressor(&arr);
72 let fsst = fsst_compress(&arr, &compr);
73
74 let idx1: PrimitiveArray = (0..1).collect();
75
76 assert_eq!(
77 fsst.take(idx1.to_array()).unwrap().dtype(),
78 &DType::Utf8(Nullability::NonNullable)
79 );
80
81 let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
82
83 assert_eq!(
84 fsst.take(idx2.to_array()).unwrap().dtype(),
85 &DType::Utf8(Nullability::Nullable)
86 );
87 }
88
89 #[rstest]
90 #[case(VarBinArray::from_iter(
91 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
92 DType::Utf8(Nullability::NonNullable),
93 ))]
94 #[case(VarBinArray::from_iter(
95 [Some("hello"), None, Some("world"), Some("test"), None],
96 DType::Utf8(Nullability::Nullable),
97 ))]
98 #[case(VarBinArray::from_iter(
99 ["single element"].map(Some),
100 DType::Utf8(Nullability::NonNullable),
101 ))]
102 fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
103 let compressor = fsst_train_compressor(&varbin);
104 let array = fsst_compress(&varbin, &compressor);
105 test_take_conformance(array.as_ref());
106 }
107
108 #[rstest]
109 #[case::fsst_simple({
111 let varbin = VarBinArray::from_iter(
112 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
113 DType::Utf8(Nullability::NonNullable),
114 );
115 let compressor = fsst_train_compressor(&varbin);
116 fsst_compress(&varbin, &compressor)
117 })]
118 #[case::fsst_nullable({
120 let varbin = VarBinArray::from_iter(
121 [Some("hello"), None, Some("world"), Some("test"), None],
122 DType::Utf8(Nullability::Nullable),
123 );
124 let compressor = fsst_train_compressor(&varbin);
125 fsst_compress(varbin, &compressor)
126 })]
127 #[case::fsst_repetitive({
129 let varbin = VarBinArray::from_iter(
130 ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
131 DType::Utf8(Nullability::NonNullable),
132 );
133 let compressor = fsst_train_compressor(&varbin);
134 fsst_compress(&varbin, &compressor)
135 })]
136 #[case::fsst_single({
138 let varbin = VarBinArray::from_iter(
139 ["single element"].map(Some),
140 DType::Utf8(Nullability::NonNullable),
141 );
142 let compressor = fsst_train_compressor(&varbin);
143 fsst_compress(&varbin, &compressor)
144 })]
145 #[case::fsst_empty_strings({
146 let varbin = VarBinArray::from_iter(
147 ["", "test", "", "hello", ""].map(Some),
148 DType::Utf8(Nullability::NonNullable),
149 );
150 let compressor = fsst_train_compressor(&varbin);
151 fsst_compress(varbin, &compressor)
152 })]
153 #[case::fsst_large({
155 let data: Vec<Option<&str>> = (0..1500)
156 .map(|i| Some(match i % 10 {
157 0 => "https://www.example.com/page",
158 1 => "https://www.test.org/data",
159 2 => "https://www.vortex.dev/docs",
160 3 => "https://www.github.com/apache/arrow",
161 4 => "https://www.rust-lang.org/learn",
162 5 => "SELECT * FROM table WHERE id = ",
163 6 => "INSERT INTO users (name, email) VALUES",
164 7 => "UPDATE records SET status = 'active'",
165 8 => "DELETE FROM logs WHERE timestamp < ",
166 _ => "CREATE TABLE data (id INT, value TEXT)",
167 }))
168 .collect();
169 let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
170 let compressor = fsst_train_compressor(&varbin);
171 fsst_compress(varbin, &compressor)
172 })]
173
174 fn test_fsst_consistency(#[case] array: FSSTArray) {
175 test_array_consistency(array.as_ref());
176 }
177}