Skip to main content

vortex_fsst/compute/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod cast;
5mod compare;
6mod filter;
7mod like;
8
9use vortex_array::ArrayRef;
10use vortex_array::DynArray;
11use vortex_array::ExecutionCtx;
12use vortex_array::IntoArray;
13use vortex_array::arrays::VarBin;
14use vortex_array::arrays::dict::TakeExecute;
15use vortex_array::builtins::ArrayBuiltins;
16use vortex_array::scalar::Scalar;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_err;
20
21use crate::FSST;
22use crate::FSSTArray;
23
24impl TakeExecute for FSST {
25    fn take(
26        array: &FSSTArray,
27        indices: &ArrayRef,
28        _ctx: &mut ExecutionCtx,
29    ) -> VortexResult<Option<ArrayRef>> {
30        Ok(Some(
31            FSSTArray::try_new(
32                array
33                    .dtype()
34                    .clone()
35                    .union_nullability(indices.dtype().nullability()),
36                array.symbols().clone(),
37                array.symbol_lengths().clone(),
38                VarBin::take(array.codes(), indices, _ctx)?
39                    .vortex_expect("cannot fail")
40                    .try_into::<VarBin>()
41                    .map_err(|_| vortex_err!("take for codes must return varbin array"))?,
42                array
43                    .uncompressed_lengths()
44                    .take(indices.to_array())?
45                    .fill_null(Scalar::zero_value(
46                        &array.uncompressed_lengths_dtype().clone(),
47                    ))?,
48            )?
49            .into_array(),
50        ))
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use rstest::rstest;
57    use vortex_array::DynArray;
58    use vortex_array::IntoArray;
59    use vortex_array::arrays::PrimitiveArray;
60    use vortex_array::arrays::VarBinArray;
61    use vortex_array::compute::conformance::consistency::test_array_consistency;
62    use vortex_array::compute::conformance::take::test_take_conformance;
63    use vortex_array::dtype::DType;
64    use vortex_array::dtype::Nullability;
65
66    use crate::FSSTArray;
67    use crate::fsst_compress;
68    use crate::fsst_train_compressor;
69
70    #[test]
71    fn test_take_null() {
72        let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
73        let compr = fsst_train_compressor(&arr);
74        let fsst = fsst_compress(&arr, &compr);
75
76        let idx1: PrimitiveArray = (0..1).collect();
77
78        assert_eq!(
79            fsst.take(idx1.into_array()).unwrap().dtype(),
80            &DType::Utf8(Nullability::NonNullable)
81        );
82
83        let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
84
85        assert_eq!(
86            fsst.take(idx2.into_array()).unwrap().dtype(),
87            &DType::Utf8(Nullability::Nullable)
88        );
89    }
90
91    #[rstest]
92    #[case(VarBinArray::from_iter(
93        ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
94        DType::Utf8(Nullability::NonNullable),
95    ))]
96    #[case(VarBinArray::from_iter(
97        [Some("hello"), None, Some("world"), Some("test"), None],
98        DType::Utf8(Nullability::Nullable),
99    ))]
100    #[case(VarBinArray::from_iter(
101        ["single element"].map(Some),
102        DType::Utf8(Nullability::NonNullable),
103    ))]
104    fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
105        let compressor = fsst_train_compressor(&varbin);
106        let array = fsst_compress(&varbin, &compressor);
107        test_take_conformance(&array.into_array());
108    }
109
110    #[rstest]
111    // Basic string arrays
112    #[case::fsst_simple({
113        let varbin = VarBinArray::from_iter(
114            ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
115            DType::Utf8(Nullability::NonNullable),
116        );
117        let compressor = fsst_train_compressor(&varbin);
118        fsst_compress(&varbin, &compressor)
119    })]
120    // Nullable strings
121    #[case::fsst_nullable({
122        let varbin = VarBinArray::from_iter(
123            [Some("hello"), None, Some("world"), Some("test"), None],
124            DType::Utf8(Nullability::Nullable),
125        );
126        let compressor = fsst_train_compressor(&varbin);
127        fsst_compress(varbin, &compressor)
128    })]
129    // Repetitive patterns (good for FSST compression)
130    #[case::fsst_repetitive({
131        let varbin = VarBinArray::from_iter(
132            ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
133            DType::Utf8(Nullability::NonNullable),
134        );
135        let compressor = fsst_train_compressor(&varbin);
136        fsst_compress(&varbin, &compressor)
137    })]
138    // Edge cases
139    #[case::fsst_single({
140        let varbin = VarBinArray::from_iter(
141            ["single element"].map(Some),
142            DType::Utf8(Nullability::NonNullable),
143        );
144        let compressor = fsst_train_compressor(&varbin);
145        fsst_compress(&varbin, &compressor)
146    })]
147    #[case::fsst_empty_strings({
148        let varbin = VarBinArray::from_iter(
149            ["", "test", "", "hello", ""].map(Some),
150            DType::Utf8(Nullability::NonNullable),
151        );
152        let compressor = fsst_train_compressor(&varbin);
153        fsst_compress(varbin, &compressor)
154    })]
155    // Large arrays
156    #[case::fsst_large({
157        let data: Vec<Option<&str>> = (0..1500)
158            .map(|i| Some(match i % 10 {
159                0 => "https://www.example.com/page",
160                1 => "https://www.test.org/data",
161                2 => "https://www.vortex.dev/docs",
162                3 => "https://www.github.com/apache/arrow",
163                4 => "https://www.rust-lang.org/learn",
164                5 => "SELECT * FROM table WHERE id = ",
165                6 => "INSERT INTO users (name, email) VALUES",
166                7 => "UPDATE records SET status = 'active'",
167                8 => "DELETE FROM logs WHERE timestamp < ",
168                _ => "CREATE TABLE data (id INT, value TEXT)",
169            }))
170            .collect();
171        let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
172        let compressor = fsst_train_compressor(&varbin);
173        fsst_compress(varbin, &compressor)
174    })]
175
176    fn test_fsst_consistency(#[case] array: FSSTArray) {
177        test_array_consistency(&array.into_array());
178    }
179}