vortex_fsst/compute/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod cast;
5mod compare;
6mod filter;
7
8use vortex_array::Array;
9use vortex_array::ArrayRef;
10use vortex_array::IntoArray;
11use vortex_array::arrays::VarBinVTable;
12use vortex_array::compute::TakeKernel;
13use vortex_array::compute::TakeKernelAdapter;
14use vortex_array::compute::fill_null;
15use vortex_array::compute::take;
16use vortex_array::register_kernel;
17use vortex_error::VortexResult;
18use vortex_scalar::Scalar;
19use vortex_scalar::ScalarValue;
20
21use crate::FSSTArray;
22use crate::FSSTVTable;
23
24impl TakeKernel for FSSTVTable {
25    // Take on an FSSTArray is a simple take on the codes array.
26    fn take(&self, array: &FSSTArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
27        Ok(FSSTArray::try_new(
28            array
29                .dtype()
30                .clone()
31                .union_nullability(indices.dtype().nullability()),
32            array.symbols().clone(),
33            array.symbol_lengths().clone(),
34            take(array.codes().as_ref(), indices)?
35                .as_::<VarBinVTable>()
36                .clone(),
37            fill_null(
38                &take(array.uncompressed_lengths(), indices)?,
39                &Scalar::new(
40                    array.uncompressed_lengths_dtype().clone(),
41                    ScalarValue::from(0),
42                ),
43            )?,
44        )?
45        .into_array())
46    }
47}
48
49register_kernel!(TakeKernelAdapter(FSSTVTable).lift());
50
51#[cfg(test)]
52mod tests {
53    use rstest::rstest;
54    use vortex_array::arrays::PrimitiveArray;
55    use vortex_array::arrays::VarBinArray;
56    use vortex_array::compute::conformance::consistency::test_array_consistency;
57    use vortex_array::compute::conformance::take::test_take_conformance;
58    use vortex_array::compute::take;
59    use vortex_dtype::DType;
60    use vortex_dtype::Nullability;
61
62    use crate::FSSTArray;
63    use crate::fsst_compress;
64    use crate::fsst_train_compressor;
65
66    #[test]
67    fn test_take_null() {
68        let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
69        let compr = fsst_train_compressor(&arr);
70        let fsst = fsst_compress(&arr, &compr);
71
72        let idx1: PrimitiveArray = (0..1).collect();
73
74        assert_eq!(
75            take(fsst.as_ref(), idx1.as_ref()).unwrap().dtype(),
76            &DType::Utf8(Nullability::NonNullable)
77        );
78
79        let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
80
81        assert_eq!(
82            take(fsst.as_ref(), idx2.as_ref()).unwrap().dtype(),
83            &DType::Utf8(Nullability::Nullable)
84        );
85    }
86
87    #[rstest]
88    #[case(VarBinArray::from_iter(
89        ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
90        DType::Utf8(Nullability::NonNullable),
91    ))]
92    #[case(VarBinArray::from_iter(
93        [Some("hello"), None, Some("world"), Some("test"), None],
94        DType::Utf8(Nullability::Nullable),
95    ))]
96    #[case(VarBinArray::from_iter(
97        ["single element"].map(Some),
98        DType::Utf8(Nullability::NonNullable),
99    ))]
100    fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
101        let compressor = fsst_train_compressor(&varbin);
102        let array = fsst_compress(&varbin, &compressor);
103        test_take_conformance(array.as_ref());
104    }
105
106    #[rstest]
107    // Basic string arrays
108    #[case::fsst_simple({
109        let varbin = VarBinArray::from_iter(
110            ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
111            DType::Utf8(Nullability::NonNullable),
112        );
113        let compressor = fsst_train_compressor(&varbin);
114        fsst_compress(&varbin, &compressor)
115    })]
116    // Nullable strings
117    #[case::fsst_nullable({
118        let varbin = VarBinArray::from_iter(
119            [Some("hello"), None, Some("world"), Some("test"), None],
120            DType::Utf8(Nullability::Nullable),
121        );
122        let compressor = fsst_train_compressor(&varbin);
123        fsst_compress(varbin, &compressor)
124    })]
125    // Repetitive patterns (good for FSST compression)
126    #[case::fsst_repetitive({
127        let varbin = VarBinArray::from_iter(
128            ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
129            DType::Utf8(Nullability::NonNullable),
130        );
131        let compressor = fsst_train_compressor(&varbin);
132        fsst_compress(&varbin, &compressor)
133    })]
134    // Edge cases
135    #[case::fsst_single({
136        let varbin = VarBinArray::from_iter(
137            ["single element"].map(Some),
138            DType::Utf8(Nullability::NonNullable),
139        );
140        let compressor = fsst_train_compressor(&varbin);
141        fsst_compress(&varbin, &compressor)
142    })]
143    #[case::fsst_empty_strings({
144        let varbin = VarBinArray::from_iter(
145            ["", "test", "", "hello", ""].map(Some),
146            DType::Utf8(Nullability::NonNullable),
147        );
148        let compressor = fsst_train_compressor(&varbin);
149        fsst_compress(varbin, &compressor)
150    })]
151    // Large arrays
152    #[case::fsst_large({
153        let data: Vec<Option<&str>> = (0..1500)
154            .map(|i| Some(match i % 10 {
155                0 => "https://www.example.com/page",
156                1 => "https://www.test.org/data",
157                2 => "https://www.vortex.dev/docs",
158                3 => "https://www.github.com/apache/arrow",
159                4 => "https://www.rust-lang.org/learn",
160                5 => "SELECT * FROM table WHERE id = ",
161                6 => "INSERT INTO users (name, email) VALUES",
162                7 => "UPDATE records SET status = 'active'",
163                8 => "DELETE FROM logs WHERE timestamp < ",
164                _ => "CREATE TABLE data (id INT, value TEXT)",
165            }))
166            .collect();
167        let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
168        let compressor = fsst_train_compressor(&varbin);
169        fsst_compress(varbin, &compressor)
170    })]
171
172    fn test_fsst_consistency(#[case] array: FSSTArray) {
173        test_array_consistency(array.as_ref());
174    }
175}