1mod cast;
5mod compare;
6mod filter;
7
8use vortex_array::arrays::VarBinVTable;
9use vortex_array::compute::{TakeKernel, TakeKernelAdapter, fill_null, take};
10use vortex_array::{Array, ArrayRef, IntoArray, register_kernel};
11use vortex_error::VortexResult;
12use vortex_scalar::{Scalar, ScalarValue};
13
14use crate::{FSSTArray, FSSTVTable};
15
16impl TakeKernel for FSSTVTable {
17 fn take(&self, array: &FSSTArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
19 Ok(FSSTArray::try_new(
20 array
21 .dtype()
22 .clone()
23 .union_nullability(indices.dtype().nullability()),
24 array.symbols().clone(),
25 array.symbol_lengths().clone(),
26 take(array.codes().as_ref(), indices)?
27 .as_::<VarBinVTable>()
28 .clone(),
29 fill_null(
30 &take(array.uncompressed_lengths(), indices)?,
31 &Scalar::new(
32 array.uncompressed_lengths_dtype().clone(),
33 ScalarValue::from(0),
34 ),
35 )?,
36 )?
37 .into_array())
38 }
39}
40
41register_kernel!(TakeKernelAdapter(FSSTVTable).lift());
42
43#[cfg(test)]
44mod tests {
45 use rstest::rstest;
46 use vortex_array::arrays::{PrimitiveArray, VarBinArray};
47 use vortex_array::compute::conformance::consistency::test_array_consistency;
48 use vortex_array::compute::conformance::take::test_take_conformance;
49 use vortex_array::compute::take;
50 use vortex_dtype::{DType, Nullability};
51
52 use crate::{FSSTArray, fsst_compress, fsst_train_compressor};
53
54 #[test]
55 fn test_take_null() {
56 let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
57 let compr = fsst_train_compressor(&arr);
58 let fsst = fsst_compress(&arr, &compr);
59
60 let idx1: PrimitiveArray = (0..1).collect();
61
62 assert_eq!(
63 take(fsst.as_ref(), idx1.as_ref()).unwrap().dtype(),
64 &DType::Utf8(Nullability::NonNullable)
65 );
66
67 let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
68
69 assert_eq!(
70 take(fsst.as_ref(), idx2.as_ref()).unwrap().dtype(),
71 &DType::Utf8(Nullability::Nullable)
72 );
73 }
74
75 #[rstest]
76 #[case(VarBinArray::from_iter(
77 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
78 DType::Utf8(Nullability::NonNullable),
79 ))]
80 #[case(VarBinArray::from_iter(
81 [Some("hello"), None, Some("world"), Some("test"), None],
82 DType::Utf8(Nullability::Nullable),
83 ))]
84 #[case(VarBinArray::from_iter(
85 ["single element"].map(Some),
86 DType::Utf8(Nullability::NonNullable),
87 ))]
88 fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
89 let compressor = fsst_train_compressor(&varbin);
90 let array = fsst_compress(&varbin, &compressor);
91 test_take_conformance(array.as_ref());
92 }
93
94 #[rstest]
95 #[case::fsst_simple({
97 let varbin = VarBinArray::from_iter(
98 ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
99 DType::Utf8(Nullability::NonNullable),
100 );
101 let compressor = fsst_train_compressor(&varbin);
102 fsst_compress(&varbin, &compressor)
103 })]
104 #[case::fsst_nullable({
106 let varbin = VarBinArray::from_iter(
107 [Some("hello"), None, Some("world"), Some("test"), None],
108 DType::Utf8(Nullability::Nullable),
109 );
110 let compressor = fsst_train_compressor(&varbin);
111 fsst_compress(varbin, &compressor)
112 })]
113 #[case::fsst_repetitive({
115 let varbin = VarBinArray::from_iter(
116 ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
117 DType::Utf8(Nullability::NonNullable),
118 );
119 let compressor = fsst_train_compressor(&varbin);
120 fsst_compress(&varbin, &compressor)
121 })]
122 #[case::fsst_single({
124 let varbin = VarBinArray::from_iter(
125 ["single element"].map(Some),
126 DType::Utf8(Nullability::NonNullable),
127 );
128 let compressor = fsst_train_compressor(&varbin);
129 fsst_compress(&varbin, &compressor)
130 })]
131 #[case::fsst_empty_strings({
132 let varbin = VarBinArray::from_iter(
133 ["", "test", "", "hello", ""].map(Some),
134 DType::Utf8(Nullability::NonNullable),
135 );
136 let compressor = fsst_train_compressor(&varbin);
137 fsst_compress(varbin, &compressor)
138 })]
139 #[case::fsst_large({
141 let data: Vec<Option<&str>> = (0..1500)
142 .map(|i| Some(match i % 10 {
143 0 => "https://www.example.com/page",
144 1 => "https://www.test.org/data",
145 2 => "https://www.vortex.dev/docs",
146 3 => "https://www.github.com/apache/arrow",
147 4 => "https://www.rust-lang.org/learn",
148 5 => "SELECT * FROM table WHERE id = ",
149 6 => "INSERT INTO users (name, email) VALUES",
150 7 => "UPDATE records SET status = 'active'",
151 8 => "DELETE FROM logs WHERE timestamp < ",
152 _ => "CREATE TABLE data (id INT, value TEXT)",
153 }))
154 .collect();
155 let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
156 let compressor = fsst_train_compressor(&varbin);
157 fsst_compress(varbin, &compressor)
158 })]
159
160 fn test_fsst_consistency(#[case] array: FSSTArray) {
161 test_array_consistency(array.as_ref());
162 }
163}