vortex_fsst/compute/
mod.rs1mod cast;
5mod compare;
6mod filter;
7
8use vortex_array::ArrayRef;
9use vortex_array::DynArray;
10use vortex_array::ExecutionCtx;
11use vortex_array::IntoArray;
12use vortex_array::arrays::VarBin;
13use vortex_array::arrays::dict::TakeExecute;
14use vortex_array::builtins::ArrayBuiltins;
15use vortex_array::scalar::Scalar;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_err;
19
20use crate::FSST;
21use crate::FSSTArray;
22
23impl TakeExecute for FSST {
24 fn take(
25 array: &FSSTArray,
26 indices: &ArrayRef,
27 _ctx: &mut ExecutionCtx,
28 ) -> VortexResult<Option<ArrayRef>> {
29 Ok(Some(
30 FSSTArray::try_new(
31 array
32 .dtype()
33 .clone()
34 .union_nullability(indices.dtype().nullability()),
35 array.symbols().clone(),
36 array.symbol_lengths().clone(),
37 VarBin::take(array.codes(), indices, _ctx)?
38 .vortex_expect("cannot fail")
39 .try_into::<VarBin>()
40 .map_err(|_| vortex_err!("take for codes must return varbin array"))?,
41 array
42 .uncompressed_lengths()
43 .take(indices.to_array())?
44 .fill_null(Scalar::zero_value(
45 &array.uncompressed_lengths_dtype().clone(),
46 ))?,
47 )?
48 .into_array(),
49 ))
50 }
51}
52
53#[cfg(test)]
54mod tests {
55 use rstest::rstest;
56 use vortex_array::DynArray;
57 use vortex_array::IntoArray;
58 use vortex_array::arrays::PrimitiveArray;
59 use vortex_array::arrays::VarBinArray;
60 use vortex_array::compute::conformance::consistency::test_array_consistency;
61 use vortex_array::compute::conformance::take::test_take_conformance;
62 use vortex_array::dtype::DType;
63 use vortex_array::dtype::Nullability;
64
65 use crate::FSSTArray;
66 use crate::fsst_compress;
67 use crate::fsst_train_compressor;
68
69 #[test]
70 fn test_take_null() {
71 let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
72 let compr = fsst_train_compressor(&arr);
73 let fsst = fsst_compress(&arr, &compr);
74
75 let idx1: PrimitiveArray = (0..1).collect();
76
77 assert_eq!(
78 fsst.take(idx1.into_array()).unwrap().dtype(),
79 &DType::Utf8(Nullability::NonNullable)
80 );
81
82 let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
83
84 assert_eq!(
85 fsst.take(idx2.into_array()).unwrap().dtype(),
86 &DType::Utf8(Nullability::Nullable)
87 );
88 }
89
90 #[rstest]
91 #[case(VarBinArray::from_iter(
92 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
93 DType::Utf8(Nullability::NonNullable),
94 ))]
95 #[case(VarBinArray::from_iter(
96 [Some("hello"), None, Some("world"), Some("test"), None],
97 DType::Utf8(Nullability::Nullable),
98 ))]
99 #[case(VarBinArray::from_iter(
100 ["single element"].map(Some),
101 DType::Utf8(Nullability::NonNullable),
102 ))]
103 fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
104 let compressor = fsst_train_compressor(&varbin);
105 let array = fsst_compress(&varbin, &compressor);
106 test_take_conformance(&array.into_array());
107 }
108
109 #[rstest]
110 #[case::fsst_simple({
112 let varbin = VarBinArray::from_iter(
113 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
114 DType::Utf8(Nullability::NonNullable),
115 );
116 let compressor = fsst_train_compressor(&varbin);
117 fsst_compress(&varbin, &compressor)
118 })]
119 #[case::fsst_nullable({
121 let varbin = VarBinArray::from_iter(
122 [Some("hello"), None, Some("world"), Some("test"), None],
123 DType::Utf8(Nullability::Nullable),
124 );
125 let compressor = fsst_train_compressor(&varbin);
126 fsst_compress(varbin, &compressor)
127 })]
128 #[case::fsst_repetitive({
130 let varbin = VarBinArray::from_iter(
131 ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
132 DType::Utf8(Nullability::NonNullable),
133 );
134 let compressor = fsst_train_compressor(&varbin);
135 fsst_compress(&varbin, &compressor)
136 })]
137 #[case::fsst_single({
139 let varbin = VarBinArray::from_iter(
140 ["single element"].map(Some),
141 DType::Utf8(Nullability::NonNullable),
142 );
143 let compressor = fsst_train_compressor(&varbin);
144 fsst_compress(&varbin, &compressor)
145 })]
146 #[case::fsst_empty_strings({
147 let varbin = VarBinArray::from_iter(
148 ["", "test", "", "hello", ""].map(Some),
149 DType::Utf8(Nullability::NonNullable),
150 );
151 let compressor = fsst_train_compressor(&varbin);
152 fsst_compress(varbin, &compressor)
153 })]
154 #[case::fsst_large({
156 let data: Vec<Option<&str>> = (0..1500)
157 .map(|i| Some(match i % 10 {
158 0 => "https://www.example.com/page",
159 1 => "https://www.test.org/data",
160 2 => "https://www.vortex.dev/docs",
161 3 => "https://www.github.com/apache/arrow",
162 4 => "https://www.rust-lang.org/learn",
163 5 => "SELECT * FROM table WHERE id = ",
164 6 => "INSERT INTO users (name, email) VALUES",
165 7 => "UPDATE records SET status = 'active'",
166 8 => "DELETE FROM logs WHERE timestamp < ",
167 _ => "CREATE TABLE data (id INT, value TEXT)",
168 }))
169 .collect();
170 let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
171 let compressor = fsst_train_compressor(&varbin);
172 fsst_compress(varbin, &compressor)
173 })]
174
175 fn test_fsst_consistency(#[case] array: FSSTArray) {
176 test_array_consistency(&array.into_array());
177 }
178}