1mod cast;
5mod compare;
6mod filter;
7mod like;
8
9use vortex_array::ArrayRef;
10use vortex_array::ArrayView;
11use vortex_array::ExecutionCtx;
12use vortex_array::IntoArray;
13use vortex_array::arrays::VarBin;
14use vortex_array::arrays::dict::TakeExecute;
15use vortex_array::builtins::ArrayBuiltins;
16use vortex_array::scalar::Scalar;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_err;
20
21use crate::FSST;
22use crate::FSSTArrayExt;
23
24impl TakeExecute for FSST {
25 fn take(
26 array: ArrayView<'_, Self>,
27 indices: &ArrayRef,
28 ctx: &mut ExecutionCtx,
29 ) -> VortexResult<Option<ArrayRef>> {
30 Ok(Some(
31 FSST::try_new(
32 array
33 .dtype()
34 .clone()
35 .union_nullability(indices.dtype().nullability()),
36 array.symbols().clone(),
37 array.symbol_lengths().clone(),
38 {
39 let codes = array.codes();
40 let codes = codes.as_view();
41 <VarBin as TakeExecute>::take(codes, indices, ctx)?
42 .vortex_expect("VarBin take kernel always returns Some")
43 }
44 .try_downcast::<VarBin>()
45 .map_err(|_| vortex_err!("take for codes must return varbin array"))?,
46 array
47 .uncompressed_lengths()
48 .take(indices.clone())?
49 .fill_null(Scalar::zero_value(
50 &array.uncompressed_lengths_dtype().clone(),
51 ))?,
52 ctx,
53 )?
54 .into_array(),
55 ))
56 }
57}
58
59#[cfg(test)]
60mod tests {
61 use rstest::rstest;
62 use vortex_array::ExecutionCtx;
63 use vortex_array::IntoArray;
64 use vortex_array::LEGACY_SESSION;
65 use vortex_array::VortexSessionExecute;
66 use vortex_array::arrays::PrimitiveArray;
67 use vortex_array::arrays::VarBinArray;
68 use vortex_array::compute::conformance::consistency::test_array_consistency;
69 use vortex_array::compute::conformance::take::test_take_conformance;
70 use vortex_array::dtype::DType;
71 use vortex_array::dtype::Nullability;
72
73 use crate::FSSTArray;
74 use crate::fsst_compress;
75 use crate::fsst_train_compressor;
76
77 #[test]
78 fn test_take_null() {
79 let mut ctx = LEGACY_SESSION.create_execution_ctx();
80 let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
81 let compr = fsst_train_compressor(&arr);
82 let fsst = fsst_compress(&arr, arr.len(), arr.dtype(), &compr, &mut ctx);
83
84 let idx1: PrimitiveArray = (0..1).collect();
85
86 assert_eq!(
87 fsst.take(idx1.into_array()).unwrap().dtype(),
88 &DType::Utf8(Nullability::NonNullable)
89 );
90
91 let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
92
93 assert_eq!(
94 fsst.take(idx2.into_array()).unwrap().dtype(),
95 &DType::Utf8(Nullability::Nullable)
96 );
97 }
98
99 #[rstest]
100 #[case(VarBinArray::from_iter(
101 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
102 DType::Utf8(Nullability::NonNullable),
103 ))]
104 #[case(VarBinArray::from_iter(
105 [Some("hello"), None, Some("world"), Some("test"), None],
106 DType::Utf8(Nullability::Nullable),
107 ))]
108 #[case(VarBinArray::from_iter(
109 ["single element"].map(Some),
110 DType::Utf8(Nullability::NonNullable),
111 ))]
112 fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
113 let mut ctx = LEGACY_SESSION.create_execution_ctx();
114 let compressor = fsst_train_compressor(&varbin);
115 let array = fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, &mut ctx);
116 test_take_conformance(&array.into_array());
117 }
118
119 type FsstBuilder = fn(&mut ExecutionCtx) -> FSSTArray;
120
121 #[rstest]
122 #[case::fsst_simple(|ctx: &mut ExecutionCtx| {
124 let varbin = VarBinArray::from_iter(
125 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
126 DType::Utf8(Nullability::NonNullable),
127 );
128 let compressor = fsst_train_compressor(&varbin);
129 fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx)
130 })]
131 #[case::fsst_nullable(|ctx: &mut ExecutionCtx| {
133 let varbin = VarBinArray::from_iter(
134 [Some("hello"), None, Some("world"), Some("test"), None],
135 DType::Utf8(Nullability::Nullable),
136 );
137 let compressor = fsst_train_compressor(&varbin);
138 let len = varbin.len();
139 let dtype = varbin.dtype().clone();
140 fsst_compress(varbin, len, &dtype, &compressor, ctx)
141 })]
142 #[case::fsst_repetitive(|ctx: &mut ExecutionCtx| {
144 let varbin = VarBinArray::from_iter(
145 ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
146 DType::Utf8(Nullability::NonNullable),
147 );
148 let compressor = fsst_train_compressor(&varbin);
149 fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx)
150 })]
151 #[case::fsst_single(|ctx: &mut ExecutionCtx| {
153 let varbin = VarBinArray::from_iter(
154 ["single element"].map(Some),
155 DType::Utf8(Nullability::NonNullable),
156 );
157 let compressor = fsst_train_compressor(&varbin);
158 fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx)
159 })]
160 #[case::fsst_empty_strings(|ctx: &mut ExecutionCtx| {
161 let varbin = VarBinArray::from_iter(
162 ["", "test", "", "hello", ""].map(Some),
163 DType::Utf8(Nullability::NonNullable),
164 );
165 let compressor = fsst_train_compressor(&varbin);
166 let len = varbin.len();
167 let dtype = varbin.dtype().clone();
168 fsst_compress(varbin, len, &dtype, &compressor, ctx)
169 })]
170 #[case::fsst_large(|ctx: &mut ExecutionCtx| {
172 let data: Vec<Option<&str>> = (0..1500)
173 .map(|i| Some(match i % 10 {
174 0 => "https://www.example.com/page",
175 1 => "https://www.test.org/data",
176 2 => "https://www.vortex.dev/docs",
177 3 => "https://www.github.com/apache/arrow",
178 4 => "https://www.rust-lang.org/learn",
179 5 => "SELECT * FROM table WHERE id = ",
180 6 => "INSERT INTO users (name, email) VALUES",
181 7 => "UPDATE records SET status = 'active'",
182 8 => "DELETE FROM logs WHERE timestamp < ",
183 _ => "CREATE TABLE data (id INT, value TEXT)",
184 }))
185 .collect();
186 let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
187 let compressor = fsst_train_compressor(&varbin);
188 let len = varbin.len();
189 let dtype = varbin.dtype().clone();
190 fsst_compress(varbin, len, &dtype, &compressor, ctx)
191 })]
192
193 fn test_fsst_consistency(#[case] build: FsstBuilder) {
194 let mut ctx = LEGACY_SESSION.create_execution_ctx();
195 let array = build(&mut ctx);
196 test_array_consistency(&array.into_array());
197 }
198}