Skip to main content

vortex_fsst/compute/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod cast;
5mod compare;
6mod filter;
7
8use vortex_array::ArrayRef;
9use vortex_array::DynArray;
10use vortex_array::ExecutionCtx;
11use vortex_array::IntoArray;
12use vortex_array::arrays::VarBin;
13use vortex_array::arrays::dict::TakeExecute;
14use vortex_array::builtins::ArrayBuiltins;
15use vortex_array::scalar::Scalar;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_err;
19
20use crate::FSST;
21use crate::FSSTArray;
22
23impl TakeExecute for FSST {
24    fn take(
25        array: &FSSTArray,
26        indices: &ArrayRef,
27        _ctx: &mut ExecutionCtx,
28    ) -> VortexResult<Option<ArrayRef>> {
29        Ok(Some(
30            FSSTArray::try_new(
31                array
32                    .dtype()
33                    .clone()
34                    .union_nullability(indices.dtype().nullability()),
35                array.symbols().clone(),
36                array.symbol_lengths().clone(),
37                VarBin::take(array.codes(), indices, _ctx)?
38                    .vortex_expect("cannot fail")
39                    .try_into::<VarBin>()
40                    .map_err(|_| vortex_err!("take for codes must return varbin array"))?,
41                array
42                    .uncompressed_lengths()
43                    .take(indices.to_array())?
44                    .fill_null(Scalar::zero_value(
45                        &array.uncompressed_lengths_dtype().clone(),
46                    ))?,
47            )?
48            .into_array(),
49        ))
50    }
51}
52
53#[cfg(test)]
54mod tests {
55    use rstest::rstest;
56    use vortex_array::DynArray;
57    use vortex_array::IntoArray;
58    use vortex_array::arrays::PrimitiveArray;
59    use vortex_array::arrays::VarBinArray;
60    use vortex_array::compute::conformance::consistency::test_array_consistency;
61    use vortex_array::compute::conformance::take::test_take_conformance;
62    use vortex_array::dtype::DType;
63    use vortex_array::dtype::Nullability;
64
65    use crate::FSSTArray;
66    use crate::fsst_compress;
67    use crate::fsst_train_compressor;
68
69    #[test]
70    fn test_take_null() {
71        let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
72        let compr = fsst_train_compressor(&arr);
73        let fsst = fsst_compress(&arr, &compr);
74
75        let idx1: PrimitiveArray = (0..1).collect();
76
77        assert_eq!(
78            fsst.take(idx1.into_array()).unwrap().dtype(),
79            &DType::Utf8(Nullability::NonNullable)
80        );
81
82        let idx2: PrimitiveArray = PrimitiveArray::from_option_iter(vec![Some(0)]);
83
84        assert_eq!(
85            fsst.take(idx2.into_array()).unwrap().dtype(),
86            &DType::Utf8(Nullability::Nullable)
87        );
88    }
89
90    #[rstest]
91    #[case(VarBinArray::from_iter(
92        ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
93        DType::Utf8(Nullability::NonNullable),
94    ))]
95    #[case(VarBinArray::from_iter(
96        [Some("hello"), None, Some("world"), Some("test"), None],
97        DType::Utf8(Nullability::Nullable),
98    ))]
99    #[case(VarBinArray::from_iter(
100        ["single element"].map(Some),
101        DType::Utf8(Nullability::NonNullable),
102    ))]
103    fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
104        let compressor = fsst_train_compressor(&varbin);
105        let array = fsst_compress(&varbin, &compressor);
106        test_take_conformance(&array.into_array());
107    }
108
109    #[rstest]
110    // Basic string arrays
111    #[case::fsst_simple({
112        let varbin = VarBinArray::from_iter(
113            ["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
114            DType::Utf8(Nullability::NonNullable),
115        );
116        let compressor = fsst_train_compressor(&varbin);
117        fsst_compress(&varbin, &compressor)
118    })]
119    // Nullable strings
120    #[case::fsst_nullable({
121        let varbin = VarBinArray::from_iter(
122            [Some("hello"), None, Some("world"), Some("test"), None],
123            DType::Utf8(Nullability::Nullable),
124        );
125        let compressor = fsst_train_compressor(&varbin);
126        fsst_compress(varbin, &compressor)
127    })]
128    // Repetitive patterns (good for FSST compression)
129    #[case::fsst_repetitive({
130        let varbin = VarBinArray::from_iter(
131            ["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
132            DType::Utf8(Nullability::NonNullable),
133        );
134        let compressor = fsst_train_compressor(&varbin);
135        fsst_compress(&varbin, &compressor)
136    })]
137    // Edge cases
138    #[case::fsst_single({
139        let varbin = VarBinArray::from_iter(
140            ["single element"].map(Some),
141            DType::Utf8(Nullability::NonNullable),
142        );
143        let compressor = fsst_train_compressor(&varbin);
144        fsst_compress(&varbin, &compressor)
145    })]
146    #[case::fsst_empty_strings({
147        let varbin = VarBinArray::from_iter(
148            ["", "test", "", "hello", ""].map(Some),
149            DType::Utf8(Nullability::NonNullable),
150        );
151        let compressor = fsst_train_compressor(&varbin);
152        fsst_compress(varbin, &compressor)
153    })]
154    // Large arrays
155    #[case::fsst_large({
156        let data: Vec<Option<&str>> = (0..1500)
157            .map(|i| Some(match i % 10 {
158                0 => "https://www.example.com/page",
159                1 => "https://www.test.org/data",
160                2 => "https://www.vortex.dev/docs",
161                3 => "https://www.github.com/apache/arrow",
162                4 => "https://www.rust-lang.org/learn",
163                5 => "SELECT * FROM table WHERE id = ",
164                6 => "INSERT INTO users (name, email) VALUES",
165                7 => "UPDATE records SET status = 'active'",
166                8 => "DELETE FROM logs WHERE timestamp < ",
167                _ => "CREATE TABLE data (id INT, value TEXT)",
168            }))
169            .collect();
170        let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
171        let compressor = fsst_train_compressor(&varbin);
172        fsst_compress(varbin, &compressor)
173    })]
174
175    fn test_fsst_consistency(#[case] array: FSSTArray) {
176        test_array_consistency(&array.into_array());
177    }
178}